#!/usr/bin/env python3 """A concurrent wrapper for timing xmltestrunner tests for buildout.coredev.""" from argparse import ArgumentParser from fnmatch import fnmatch from logging import Formatter from logging import getLogger from logging import INFO from logging import StreamHandler from logging.handlers import MemoryHandler from math import ceil from multiprocessing import cpu_count from multiprocessing import Pool from os import access from os import environ from os import killpg from os import path from os import pathsep from os import setpgrp from os import unlink from os import walk from os import X_OK from signal import SIGINT from signal import SIGKILL from signal import signal from subprocess import CalledProcessError from subprocess import check_output from subprocess import DEVNULL from subprocess import STDOUT from time import sleep from time import time import locale import re import sys def which(program): def is_exe(fpath): return path.isfile(fpath) and access(fpath, X_OK) fpath, fname = path.split(program) if fpath: if is_exe(program): return program else: for dpath in environ["PATH"].split(pathsep): exe_file = path.join(dpath, program) if is_exe(exe_file): return exe_file return None def humanize_time(seconds): """Humanize a seconds based delta time. Only handles time spans up to weeks for simplicity. """ minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) days, hours = divmod(hours, 24) weeks, days = divmod(days, 7) seconds = int(seconds) minutes = int(minutes) hours = int(hours) days = int(days) weeks = int(weeks) output = [] if weeks: quantifier = "weeks" if weeks > 1 or weeks == 0 else "week" output.append("{} {}".format(weeks, quantifier)) if days: quantifier = "days" if days > 1 or days == 0 else "day" output.append("{} {}".format(days, quantifier)) if hours: quantifier = "hours" if hours > 1 or hours == 0 else "hour" output.append("{} {}".format(hours, quantifier)) if minutes: quantifier = "minutes" if minutes > 1 or minutes == 0 else "minute" output.append("{} {}".format(minutes, quantifier)) quantifier = "seconds" if seconds > 1 or seconds == 0 else "second" output.append("{} {}".format(seconds, quantifier)) return " ".join(output) def setup_termination(): # Set the group flag so that subprocesses will be in the same group. setpgrp() def terminate(signum, frame): # Kill the group (including main process) on terminal signal. killpg(0, SIGKILL) signal(SIGINT, terminate) def discover_tests(): logger.info("Discovering tests.") batches = {} suite = None layer = None classname = None with Pool(CONCURRENCY) as p: for suite, result in p.imap_unordered( fetch_test_discovery_output, ((suite, cmdline) for suite, cmdline in TEST_SUITES.items()), ): batches[suite] = {} for line in result: if line.startswith("Listing"): layer = re.search("^Listing (.*) tests:", line).groups()[0] batches[suite][layer] = {} # All listed tests are indented with 2 spaces if layer and line.startswith(" "): if "(" in line: classname = re.search(r".*\((.*)\).*", line).groups()[0] else: # Some doctests have a filename:testcase convention classname = re.search("([^:]*)", line).groups()[0].strip() # Count discovered tests per layer per testclass if not batches.get(suite).get(layer).get(classname): batches[suite][layer][classname] = {'layer': layer, 'count': 0} batches[suite][layer][classname]['count'] += 1 return batches def fetch_test_discovery_output(suite_discovery_arguments): # Pool().imap_unordered() does not have a starmap mechanism suite, cmdline = suite_discovery_arguments output_encoding = sys.stdout.encoding if output_encoding is None: output_encoding = locale.getpreferredencoding() output = check_output(cmdline, stderr=DEVNULL) return suite, output.decode(output_encoding).splitlines() def chunks(chunkable, chunksize): output = [] while chunkable: output.append([]) size = 0 while chunkable and size < chunksize: output[-1].append(chunkable.pop(0)) size = sum(testclass[-1].get('count') for testclass in output[-1]) return output def split(batch, chunk_count=0): suite = batch.get("suite") layers = batch.get("layers") if not chunk_count: chunk_count = sum(layer_chunk_counts.get(layer, 0) for layer in layers) if chunk_count < 2 or CONCURRENCY < 2: batch["count"] = batch.get("original_count") return (batch,) original_count = batch["original_count"] chunksize = int(ceil(original_count / chunk_count)) splinters = [] for chunk in chunks(list(batch.get("testclasses").items()), chunksize): splinters.append( { "suite": suite, "layers": tuple(set(testclass[1].get('layer') for testclass in chunk)), "testclasses": dict(chunk), "original_count": original_count, 'count': sum(testclass[1].get('count') for testclass in chunk) } ) return tuple(sorted(splinters, key=splinter_sorter)) def splinter_sorter(splinter): return -splinter.get("count") def create_test_run_params(): test_run_params = [] mergeable_batches = [] for suite, layers in discover_tests().items(): mergeable_batches.append({"suite": suite, "layers": [], "testclasses": {}, "original_count": 0, "count": 0}) for layer, testclasses in layers.items(): batch = {} batch["suite"] = suite batch["layers"] = (layer,) batch["testclasses"] = testclasses batch["original_count"] = sum(testclass.get('count') for testclass in testclasses.values()) split_batches = split(batch) for i, split_batch in enumerate(split_batches): split_batch["batchinfo"] = "{}/{}".format(i + 1, len(split_batches)) if len(split_batches) < 2 and not any( layer in layer_chunk_counts for layer in split_batch["layers"] ): mergeable_batches[-1]['layers'] += split_batch['layers'] mergeable_batches[-1]['testclasses'].update(split_batch['testclasses']) mergeable_batches[-1]['original_count'] += split_batch['original_count'] mergeable_batches[-1]['count'] += split_batch['count'] else: test_run_params.append(split_batch) for batch in mergeable_batches: chunk_count = int(ceil(batch.get('count') / 512)) split_batches = split(batch, chunk_count=chunk_count) for i, split_batch in enumerate(split_batches): split_batch["batchinfo"] = "{}/{}".format(i + 1, len(split_batches)) test_run_params.append(split_batch) return tuple(sorted(test_run_params, key=test_batch_sorter)) def test_batch_sorter(batch): # First we start the split megabatches # Then we start all the known-slow layers layers = batch.get('layers') return ( 1 if len(layers) < 2 else 0, -sum(layer_chunk_counts.get(layer, 0) for layer in layers), batch.get('batchinfo'), ) def remove_bytecode_files(directory_path): logger.info("Removing bytecode files from %s", directory_path) for filename in find_bytecode_files(directory_path): unlink(filename) def find_bytecode_files(directory_path): for root, _, files in walk(directory_path): for name in files: if fnmatch(name, "*.py[co]"): yield path.join(root, name) def run_tests(test_run_params): """Run and time 'bin/test --layer layer -m module [-m module]'. Return the suite, layer name, batch info, returncode and runtime. """ params = ["bin/test"] params.append("--all") params.append("--xml") suite = test_run_params.get("suite") layers = test_run_params.get("layers") testclasses = test_run_params.get("testclasses") batchinfo = test_run_params.get("batchinfo", "1/1") count = test_run_params.get("count") if layers: for layer in layers: params.append("--layer") params.append(layer) if testclasses: for testclass in testclasses: params.append("-t") params.append(testclass) xvfb_screen_size = environ.get("XVFB_SCREEN_SIZE") if suite == "robot" and xvfb_screen_size: xvfb_params = [ "xvfb-run", "-a", "--server-args=-screen 0 {}".format(xvfb_screen_size), ] params = tuple(xvfb_params + params) else: params = tuple(params) printable_params = " ".join( ["'{}'".format(param) if " " in param else param for param in params] ) logger.info( "START - %s - %s - %s - %d %s", suite, " ".join(layers), batchinfo, count, "test" if count == 1 else "tests", ) memory_handler.flush() start = time() try: output = check_output(params, stderr=STDOUT, universal_newlines=True) returncode = 0 except CalledProcessError as e: output = e.output returncode = e.returncode runtime = time() - start result = { "layers": layers, "returncode": returncode, "runtime": runtime, } done_args = ( "DONE - %s - %s - %s - %d %s in %s", suite, " ".join(layers), batchinfo, count, "test" if count == 1 else "tests", humanize_time(runtime), ) if returncode: log_output.error("") for line in output.splitlines(): log_output.error(line) log_output.error("") log_output.error("Command line") log_output.error("") log_output.error(printable_params) log_output.error("") stdout_handler.flush() logger.error(*done_args) else: logger.info(*done_args) memory_handler.flush() return result def main(): """Discovers and times tests in parallel via multiprocessing.Pool().""" # Remove *.py[co] files to avoid race conditions with parallel workers # stepping on each other's toes when trying to clean up stale bytecode. # # Setting PYTHONDONTWRITEBYTECODE is not enough, because running buildout # also already precompiles bytecode for some eggs. remove_bytecode_files(SOURCE_PATH) start = time() test_run_params = create_test_run_params() batch_count = len(test_run_params) logger.info("Discovered tests in %s", humanize_time(time() - start)) logger.info("Split the tests into %d jobs", batch_count) logger.info("Running the jobs in up to %d processes in parallel", CONCURRENCY) # We need to explicitly flush here in order to avoid multiprocessing # related log output duplications due to picking inputs and globals as the # default IPC mechanism memory_handler.flush() results = [] start = time() with Pool(CONCURRENCY) as p: for params in test_run_params: # Alleviate cPickle load bursting for IPC sleep(0.25) results.append(p.apply_async(run_tests, (params, ))) success, runtime = handle_results(results) logger.info("Ran %d tests.", sum(batch.get("count") for batch in test_run_params)) logger.info("Aggregate runtime %s.", humanize_time(runtime)) logger.info("Wallclock runtime %s.", humanize_time(time() - start)) return success def handle_results(results): failed_tests = set() job_count = len(results) mature_results = [] runtime = 0 while results: sleep(1) mature_index = None for i, result in enumerate(results): if result.ready(): mature_results.append(results.pop(i).get()) # The enumeration index can be off for the .pop() otherwise break if mature_index is not None: mature_results.append(results.pop(mature_index).get()) mature_index = None for result in mature_results: runtime += result.get('runtime') returncode = result.get('returncode', 1) if returncode: for layer in result.get('layers'): failed_tests.add(layer) error_count = len(failed_tests) if error_count == 0: logger.info("No failed tests.") return True, runtime logger.error("%d / %d jobs failed.", error_count, job_count) for result in set(failed_tests): logger.error("Failures in %s", result) return False, runtime # Having the __main__ guard is necessary for multiprocessing.Pool(). if __name__ == "__main__": # SIGINT kills dead setup_termination() # Globals environ["PYTHONDONTWRITEBYTECODE"] = "1" environ["PYTHONUNBUFFERED"] = "1" environ["ROBOTSUITE_PREFIX"] = "ROBOT" CONCURRENCY = cpu_count() BUILDOUT_PATH = path.abspath(path.join(__file__, "..", "..")) SOURCE_PATH = path.join(BUILDOUT_PATH, "src") # Logging default_loglevel = INFO logger = getLogger("mtest") logger.setLevel(default_loglevel) # Set up logging to stdout stream_handler = StreamHandler() stream_handler.setLevel(default_loglevel) log_formatter = Formatter( " - ".join(("%(asctime)s", "%(levelname)s", "%(message)s")) ) stream_handler.setFormatter(log_formatter) # Buffer log messages so we do not get broken-by-racecondition lines memory_handler = MemoryHandler(2, target=stream_handler) memory_handler.setLevel(default_loglevel) logger.addHandler(memory_handler) # Set up a separate logger for writing failure output to stdout. We do this # because the logging module handles I/O encoding properly, whereas with # 'print' we'd need to do it ourselves. (Think piping the output of # bin/mtest somewhere, or shell I/O redirection). log_output = getLogger("mtest.output") log_output.propagate = False stdout_handler = StreamHandler(stream=sys.stdout) stdout_handler.setFormatter(Formatter("")) log_output.addHandler(stdout_handler) log_output.setLevel(INFO) # CLI arguments parser = ArgumentParser( description="Run tests in parallel.", epilog="At least one of --plone or --robot is required. " "You must also define either chromedriver or geckodriver for Robot tests.", ) parser.add_argument("--plone", action="store_true", help="Run Dexterity tests.") parser.add_argument( "--robot", help="Run Robot tets with the selenium driver you define." ) parser.add_argument( "--screen", help="Run Robot tets with a defined XVFB screen size. Only applies if 'xvfb-run' is on your $PATH. Defaults to 1920x1200x24.", default="1920x1200x24", ) parser.add_argument( "-j", "--jobs", type=int, help="Set the testing concurrency level. " "Defaults to the number of system threads.", ) args = parser.parse_args() if not any((args.plone, args.robot)): parser.print_help() exit(1) if args.jobs: CONCURRENCY = int(args.jobs) TEST_SUITES = {} if args.plone: TEST_SUITES["plone"] = ("bin/test", "--all", "--list-tests", "-t", "!ROBOT") if args.robot: TEST_SUITES["robot"] = ("bin/test", "--all", "--list-tests", "-t", "ROBOT") robot_browsers = {"chromedriver": "chrome", "geckodriver": "firefox"} browser = robot_browsers.get(args.robot) if browser: environ["ROBOT_BROWSER"] = browser else: parser.print_help() exit(1) logger.info("Selenium browser: %s", browser) if which("xvfb-run"): screen = args.screen logger.info( "Using 'xvfb-run -a' to wrap the tests with a screen size of {}.".format( screen ) ) environ["XVFB_SCREEN_SIZE"] = screen # Here we do manual gardening of known-slow and/or splittable layers layer_chunk_counts = { "plone.api.tests.base.PloneApiLayer:Integration": 1, "plone.app.content.testing.PloneAppContentDX:Functional": 1, "plone.app.contenttypes.testing.PloneAppContenttypes:Functional": 1, "plone.app.contenttypes.testing.PloneAppContenttypes:Integration": 1, "plone.app.contenttypes.testing.PloneAppContenttypes:Robot": 1, # Very slow tests! "plone.app.dexterity.testing.dexterity:Functional": 1, # Very slow tests! "plone.app.linkintegrity.testing.plone.app.linkintegrity:DX:Functional": 1, "plone.app.multilingual.testing.plone.app.multilingual:Functional": 1, "plone.app.portlets.testing.PloneAppPortlets:Integration": 1, "plone.app.testing.bbb.PloneTestCase:Functional": 1, "plone.app.users.testing.PloneAppUsersLayer:FunctionalTesting": 1, "plone.restapi.testing.PloneRestApiDXLayer:Functional": 1, "plone.restapi.testing.PloneRestApiDXLayer:Integration": 1, "plone.restapi.testing.PloneRestApiDXLayerFreeze:Functional": 1, "Products.CMFEditions.testing.ProductsCmfeditionsLayer:IntegrationTesting": 1, "Products.CMFPlacefulWorkflow.testing.ProductsCmfplacefulworkflowLayer:FunctionalTesting": 1, "Products.CMFPlone.testing.CMFPloneLayer:Acceptance": 2, # Many Robot suite setups! "Products.CMFPlone.testing.CMFPloneLayer:Functional": 1, "Products.CMFPlone.testing.CMFPloneLayer:Integration": 1, "zope.testrunner.layer.UnitTests": 1, # Very many quick tests } if main(): exit(0) exit(1)