# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. """ Graph morphs are modifications to task-graphs that take place *after* the optimization phase. These graph morphs are largely invisible to developers running `./mach` locally, so they should be limited to changes that do not modify the meaning of the graph. """ # Note that the translation of `{'task-reference': '..'}` and # `artifact-reference` are handled in the optimization phase (since # optimization involves dealing with taskIds directly). Similarly, # `{'relative-datestamp': '..'}` is handled at the last possible moment during # task creation. import logging import os import re from slugid import nice as slugid from .graph import Graph from .task import Task from .taskgraph import TaskGraph from .util.workertypes import get_worker_type here = os.path.abspath(os.path.dirname(__file__)) logger = logging.getLogger(__name__) MAX_ROUTES = 64 registered_morphs = [] def register_morph(func): registered_morphs.append(func) return func def amend_taskgraph(taskgraph, label_to_taskid, to_add): """Add the given tasks to the taskgraph, returning a new taskgraph""" new_tasks = taskgraph.tasks.copy() new_edges = set(taskgraph.graph.edges) for task in to_add: new_tasks[task.task_id] = task assert task.label not in label_to_taskid label_to_taskid[task.label] = task.task_id for depname, dep in task.dependencies.items(): new_edges.add((task.task_id, dep, depname)) taskgraph = TaskGraph(new_tasks, Graph(set(new_tasks), new_edges)) # type: ignore return taskgraph, label_to_taskid def derive_index_task(task, taskgraph, label_to_taskid, parameters, graph_config): """Create the shell of a task that depends on `task` and on the given docker image.""" purpose = "index-task" label = f"{purpose}-{task.label}" provisioner_id, worker_type = get_worker_type( graph_config, "misc", parameters["level"] ) task_def = { "provisionerId": provisioner_id, "workerType": worker_type, "dependencies": [task.task_id], "created": {"relative-datestamp": "0 seconds"}, "deadline": task.task["deadline"], # no point existing past the parent task's deadline "expires": task.task["deadline"], "metadata": { "name": label, "description": "{} for {}".format( purpose, task.task["metadata"]["description"] ), "owner": task.task["metadata"]["owner"], "source": task.task["metadata"]["source"], }, "scopes": [], "payload": { "image": { "path": "public/image.tar.zst", "namespace": "taskgraph.cache.level-3.docker-images.v2.index-task.latest", "type": "indexed-image", }, "features": { "taskclusterProxy": True, }, "maxRunTime": 600, }, } # only include the docker-image dependency here if it is actually in the # taskgraph (has not been optimized). It is included in # task_def['dependencies'] unconditionally. dependencies = {"parent": task.task_id} task = Task( kind="misc", label=label, attributes={}, task=task_def, dependencies=dependencies, ) task.task_id = slugid() # type: ignore return task, taskgraph, label_to_taskid def make_index_task(parent_task, taskgraph, label_to_taskid, parameters, graph_config): index_paths = [ r.split(".", 1)[1] for r in parent_task.task["routes"] if r.startswith("index.") ] parent_task.task["routes"] = [ r for r in parent_task.task["routes"] if not r.startswith("index.") ] task, taskgraph, label_to_taskid = derive_index_task( parent_task, taskgraph, label_to_taskid, parameters, graph_config ) # we need to "summarize" the scopes, otherwise a particularly # namespace-heavy index task might have more scopes than can fit in a # temporary credential. scopes = set() domain_index_regex = re.compile( r"({trust_domain}\.v2\.[^.]*\.).*".format( trust_domain=re.escape(graph_config["trust-domain"]) ) ) index_path_res = [domain_index_regex] for path in graph_config["taskgraph"].get("index-path-regexes", ()): index_path_res.append(re.compile(path)) for path in index_paths: for index_path_re in index_path_res: match = index_path_re.match(path) if match: path = match.group(1) + "*" break scope = f"index:insert-task:{path}" scopes.add(scope) task.task["scopes"] = sorted(scopes) task.task["payload"]["command"] = ["insert-indexes.js"] + index_paths task.task["payload"]["env"] = { "TARGET_TASKID": parent_task.task_id, "INDEX_RANK": parent_task.task.get("extra", {}).get("index", {}).get("rank", 0), } return task, taskgraph, label_to_taskid @register_morph def add_index_tasks(taskgraph, label_to_taskid, parameters, graph_config): """ The TaskCluster queue only allows 64 routes on a task. In the event a task exceeds this limit, this graph morph adds "index tasks" that depend on it and do the index insertions directly, avoiding the limit on task.routes. """ logger.debug("Morphing: adding index tasks") added = [] for label, task in taskgraph.tasks.items(): if len(task.task.get("routes", [])) <= MAX_ROUTES: continue task, taskgraph, label_to_taskid = make_index_task( task, taskgraph, label_to_taskid, parameters, graph_config ) added.append(task) if added: taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added) logger.info(f"Added {len(added)} index tasks") return taskgraph, label_to_taskid def _get_morph_url(): """ Guess a URL for the current file, for source metadata for created tasks. If we checked out the taskgraph code with run-task in the decision task, we can use TASKGRAPH_* to find the right version, which covers the existing use case. """ taskgraph_repo = os.environ.get( "TASKGRAPH_HEAD_REPOSITORY", "https://github.com/taskcluster/taskgraph" ) taskgraph_rev = os.environ.get("TASKGRAPH_HEAD_REV", "default") return f"{taskgraph_repo}/raw-file/{taskgraph_rev}/src/taskgraph/morph.py" @register_morph def add_code_review_task(taskgraph, label_to_taskid, parameters, graph_config): logger.debug("Morphing: adding code review task") review_config = parameters.get("code-review") if not review_config: return taskgraph, label_to_taskid code_review_tasks = {} for label, task in taskgraph.tasks.items(): if task.attributes.get("code-review"): code_review_tasks[task.label] = task.task_id if code_review_tasks: code_review_task_def = { "provisionerId": "built-in", "workerType": "succeed", "dependencies": sorted(code_review_tasks.values()), # This option permits to run the task # regardless of the dependencies tasks exit status # as we are interested in the task failures "requires": "all-resolved", "created": {"relative-datestamp": "0 seconds"}, "deadline": {"relative-datestamp": "1 day"}, # no point existing past the parent task's deadline "expires": {"relative-datestamp": "1 day"}, "metadata": { "name": "code-review", "description": "List all issues found in static analysis and linting tasks", "owner": parameters["owner"], "source": _get_morph_url(), }, "scopes": [], "payload": {}, "routes": ["project.relman.codereview.v1.try_ending"], "extra": { "code-review": { "phabricator-build-target": review_config[ "phabricator-build-target" ], "repository": parameters["head_repository"], "revision": parameters["head_rev"], } }, } task = Task( kind="misc", label="code-review", attributes={}, task=code_review_task_def, dependencies=code_review_tasks, ) task.task_id = slugid() # type: ignore taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, [task]) logger.info("Added code review task.") return taskgraph, label_to_taskid def morph(taskgraph, label_to_taskid, parameters, graph_config): """Apply all morphs""" for m in registered_morphs: taskgraph, label_to_taskid = m( taskgraph, label_to_taskid, parameters, graph_config ) return taskgraph, label_to_taskid