# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import time from functools import reduce from mozterm import Terminal from ..handlers import SummaryHandler from . import base from .process import strstatus from .tbplformatter import TbplFormatter color_dict = { "log_test_status_fail": "red", "log_process_output": "blue", "log_test_status_pass": "green", "log_test_status_unexpected_fail": "red", "log_test_status_known_intermittent": "yellow", "time": "cyan", "action": "yellow", "pid": "cyan", "heading": "bold_yellow", "sub_heading": "yellow", "error": "red", "warning": "yellow", "bold": "bold", "grey": "grey", "normal": "normal", "bright_black": "bright_black", } DEFAULT = "\x1b(B\x1b[m" def format_seconds(total): """Format number of seconds to MM:SS.DD form.""" minutes, seconds = divmod(total, 60) return "%2d:%05.2f" % (minutes, seconds) class TerminalColors(object): def __init__(self, term, color_dict): for key, value in color_dict.items(): attribute = getattr(term, value) # In Blessed, these attributes aren't always callable. We can assume # that if they're not, they're just the raw ANSI Escape Sequences. # This TerminalColors class is basically just a lookup table for # what function to call to format/color an input string a certain way. # So if the attribute above is a callable, we can just proceed, but # if it's not, we need to create our own function that prepends the # raw ANSI Escape Sequences to the input string, so that everything # has the same behavior. We append DEFAULT to reset to no formatting # at the end of our string, to prevent text that comes afterwards # from inheriting the prepended formatting. if not callable(attribute): def apply_formatting(text): return attribute + text + DEFAULT attribute = apply_formatting setattr(self, key, attribute) class MachFormatter(base.BaseFormatter): def __init__( self, start_time=None, write_interval=False, write_times=True, terminal=None, disable_colors=False, summary_on_shutdown=False, verbose=False, enable_screenshot=False, **kwargs ): super(MachFormatter, self).__init__(**kwargs) if start_time is None: start_time = time.time() start_time = int(start_time * 1000) self.start_time = start_time self.write_interval = write_interval self.write_times = write_times self.status_buffer = {} self.has_unexpected = {} self.last_time = None self.color_formatter = TerminalColors( Terminal(disable_styling=disable_colors), color_dict ) self.verbose = verbose self._known_pids = set() self.tbpl_formatter = None self.enable_screenshot = enable_screenshot self.summary = SummaryHandler() self.summary_on_shutdown = summary_on_shutdown message_handlers = { "colors": { "on": self._enable_colors, "off": self._disable_colors, }, "summary_on_shutdown": { "on": self._enable_summary_on_shutdown, "off": self._disable_summary_on_shutdown, }, } for topic, handlers in message_handlers.items(): self.message_handler.register_message_handlers(topic, handlers) def __call__(self, data): self.summary(data) s = super(MachFormatter, self).__call__(data) if s is None: return time = self.color_formatter.time(format_seconds(self._time(data))) return "%s %s\n" % (time, s) def _enable_colors(self): self.disable_colors = False def _disable_colors(self): self.disable_colors = True def _enable_summary_on_shutdown(self): self.summary_on_shutdown = True def _disable_summary_on_shutdown(self): self.summary_on_shutdown = False def _get_test_id(self, data): test_id = data.get("test") if isinstance(test_id, list): test_id = tuple(test_id) return test_id def _get_file_name(self, test_id): if isinstance(test_id, str): return test_id if isinstance(test_id, tuple): return "".join(test_id) assert False, "unexpected test_id" def suite_start(self, data): num_tests = reduce(lambda x, y: x + len(y), data["tests"].values(), 0) action = self.color_formatter.action(data["action"].upper()) name = "" if "name" in data: name = " %s -" % (data["name"],) return "%s:%s running %i tests" % (action, name, num_tests) def suite_end(self, data): action = self.color_formatter.action(data["action"].upper()) rv = [action] if not self.summary_on_shutdown: rv.append( self._format_suite_summary( self.summary.current_suite, self.summary.current ) ) return "\n".join(rv) def _format_expected(self, status, expected, known_intermittent=[]): if status == expected: color = self.color_formatter.log_test_status_pass if expected not in ("PASS", "OK"): color = self.color_formatter.log_test_status_fail status = "EXPECTED-%s" % status else: if status in known_intermittent: color = self.color_formatter.log_test_status_known_intermittent status = "KNOWN-INTERMITTENT-%s" % status else: color = self.color_formatter.log_test_status_fail if status in ("PASS", "OK"): status = "UNEXPECTED-%s" % status return color(status) def _format_status(self, test, data): name = data.get("subtest", test) rv = "%s %s" % ( self._format_expected( data["status"], data.get("expected", data["status"]), data.get("known_intermittent", []), ), name, ) if "message" in data: rv += " - %s" % data["message"] if "stack" in data: rv += self._format_stack(data["stack"]) return rv def _format_stack(self, stack): return "\n%s\n" % self.color_formatter.bright_black(stack.strip("\n")) def _format_suite_summary(self, suite, summary): count = summary["counts"] logs = summary["unexpected_logs"] intermittent_logs = summary["intermittent_logs"] harness_errors = summary["harness_errors"] rv = [ "", self.color_formatter.sub_heading(suite), self.color_formatter.sub_heading("~" * len(suite)), ] # Format check counts checks = self.summary.aggregate("count", count) rv.append( "Ran {} checks ({})".format( sum(checks.values()), ", ".join( ["{} {}s".format(v, k) for k, v in sorted(checks.items()) if v] ), ) ) # Format expected counts checks = self.summary.aggregate("expected", count, include_skip=False) intermittent_checks = self.summary.aggregate( "known_intermittent", count, include_skip=False ) intermittents = sum(intermittent_checks.values()) known = ( " ({} known intermittents)".format(intermittents) if intermittents else "" ) rv.append("Expected results: {}{}".format(sum(checks.values()), known)) # Format skip counts skip_tests = count["test"]["expected"]["skip"] skip_subtests = count["subtest"]["expected"]["skip"] if skip_tests: skipped = "Skipped: {} tests".format(skip_tests) if skip_subtests: skipped = "{}, {} subtests".format(skipped, skip_subtests) rv.append(skipped) # Format unexpected counts checks = self.summary.aggregate("unexpected", count) unexpected_count = sum(checks.values()) rv.append("Unexpected results: {}".format(unexpected_count)) if unexpected_count: for key in ("test", "subtest", "assert"): if not count[key]["unexpected"]: continue status_str = ", ".join( [ "{} {}".format(n, s) for s, n in sorted(count[key]["unexpected"].items()) ] ) rv.append( " {}: {} ({})".format( key, sum(count[key]["unexpected"].values()), status_str ) ) # Format intermittents if intermittents > 0: heading = "Known Intermittent Results" rv.extend( [ "", self.color_formatter.heading(heading), self.color_formatter.heading("-" * len(heading)), ] ) if count["subtest"]["count"]: for test_id, results in intermittent_logs.items(): test = self._get_file_name(test_id) rv.append(self.color_formatter.bold(test)) for data in results: rv.append(" %s" % self._format_status(test, data).rstrip()) else: for test_id, results in intermittent_logs.items(): test = self._get_file_name(test_id) for data in results: assert "subtest" not in data rv.append(self._format_status(test, data).rstrip()) # Format status testfailed = any( count[key]["unexpected"] for key in ("test", "subtest", "assert") ) if not testfailed and not harness_errors: rv.append(self.color_formatter.log_test_status_pass("OK")) else: # Format test failures heading = "Unexpected Results" rv.extend( [ "", self.color_formatter.heading(heading), self.color_formatter.heading("-" * len(heading)), ] ) if count["subtest"]["count"]: for test_id, results in logs.items(): test = self._get_file_name(test_id) rv.append(self.color_formatter.bold(test)) for data in results: rv.append(" %s" % self._format_status(test, data).rstrip()) else: for test_id, results in logs.items(): test = self._get_file_name(test_id) for data in results: assert "subtest" not in data rv.append(self._format_status(test, data).rstrip()) # Format harness errors if harness_errors: for data in harness_errors: rv.append(self.log(data)) return "\n".join(rv) def test_start(self, data): action = self.color_formatter.action(data["action"].upper()) return "%s: %s" % (action, self._get_test_id(data)) def test_end(self, data): subtests = self._get_subtest_data(data) if "expected" in data and data["status"] not in data.get( "known_intermittent", [] ): parent_unexpected = True expected_str = ", expected %s" % data["expected"] else: parent_unexpected = False expected_str = "" has_screenshots = "reftest_screenshots" in data.get("extra", {}) test = self._get_test_id(data) # Reset the counts to 0 self.status_buffer[test] = {"count": 0, "unexpected": 0, "pass": 0} self.has_unexpected[test] = bool(subtests["unexpected"]) if subtests["count"] != 0: rv = "Test %s%s. Subtests passed %i/%i. Unexpected %s" % ( data["status"], expected_str, subtests["pass"], subtests["count"], subtests["unexpected"], ) else: rv = "%s%s" % (data["status"], expected_str) unexpected = self.summary.current["unexpected_logs"].get(data["test"]) if unexpected: if len(unexpected) == 1 and parent_unexpected: message = unexpected[0].get("message", "") if message: rv += " - %s" % message if "stack" in data: rv += self._format_stack(data["stack"]) elif not self.verbose: rv += "\n" for d in unexpected: rv += self._format_status(data["test"], d) intermittents = self.summary.current["intermittent_logs"].get(data["test"]) if intermittents: rv += "\n" for d in intermittents: rv += self._format_status(data["test"], d) if "expected" not in data and not bool(subtests["unexpected"]): color = self.color_formatter.log_test_status_pass else: color = self.color_formatter.log_test_status_unexpected_fail action = color(data["action"].upper()) rv = "%s: %s" % (action, rv) if has_screenshots and self.enable_screenshot: if self.tbpl_formatter is None: self.tbpl_formatter = TbplFormatter() # Create TBPL-like output that can be pasted into the reftest analyser rv = "\n".join((rv, self.tbpl_formatter.test_end(data))) return rv def valgrind_error(self, data): rv = " " + data["primary"] + "\n" for line in data["secondary"]: rv = rv + line + "\n" return rv def lsan_leak(self, data): allowed = data.get("allowed_match") if allowed: prefix = self.color_formatter.log_test_status_fail("FAIL") else: prefix = self.color_formatter.log_test_status_unexpected_fail( "UNEXPECTED-FAIL" ) return "%s LeakSanitizer: leak at %s" % (prefix, ", ".join(data["frames"])) def lsan_summary(self, data): allowed = data.get("allowed", False) if allowed: prefix = self.color_formatter.warning("WARNING") else: prefix = self.color_formatter.error("ERROR") return ( "%s | LeakSanitizer | " "SUMMARY: AddressSanitizer: %d byte(s) leaked in %d allocation(s)." % (prefix, data["bytes"], data["allocations"]) ) def mozleak_object(self, data): data_log = data.copy() data_log["level"] = "INFO" data_log["message"] = "leakcheck: %s leaked %d %s" % ( data["process"], data["bytes"], data["name"], ) return self.log(data_log) def mozleak_total(self, data): if data["bytes"] is None: # We didn't see a line with name 'TOTAL' if data.get("induced_crash", False): data_log = data.copy() data_log["level"] = "INFO" data_log["message"] = ( "leakcheck: %s deliberate crash and thus no leak log\n" % data["process"] ) return self.log(data_log) if data.get("ignore_missing", False): return ( "%s ignoring missing output line for total leaks\n" % data["process"] ) status = self.color_formatter.log_test_status_pass("FAIL") return "%s leakcheck: " "%s missing output line for total leaks!\n" % ( status, data["process"], ) if data["bytes"] == 0: return "%s leakcheck: %s no leaks detected!\n" % ( self.color_formatter.log_test_status_pass("PASS"), data["process"], ) message = "leakcheck: %s %d bytes leaked\n" % (data["process"], data["bytes"]) # data["bytes"] will include any expected leaks, so it can be off # by a few thousand bytes. failure = data["bytes"] > data["threshold"] status = ( self.color_formatter.log_test_status_unexpected_fail("UNEXPECTED-FAIL") if failure else self.color_formatter.log_test_status_fail("FAIL") ) return "%s %s\n" % (status, message) def test_status(self, data): test = self._get_test_id(data) if test not in self.status_buffer: self.status_buffer[test] = {"count": 0, "unexpected": 0, "pass": 0} self.status_buffer[test]["count"] += 1 if data["status"] == "PASS": self.status_buffer[test]["pass"] += 1 if "expected" in data and data["status"] not in data.get( "known_intermittent", [] ): self.status_buffer[test]["unexpected"] += 1 if self.verbose: return self._format_status(test, data).rstrip("\n") def assertion_count(self, data): if data["min_expected"] <= data["count"] <= data["max_expected"]: return if data["min_expected"] != data["max_expected"]: expected = "%i to %i" % (data["min_expected"], data["max_expected"]) else: expected = "%i" % data["min_expected"] action = self.color_formatter.log_test_status_fail("ASSERT") return "%s: Assertion count %i, expected %s assertions\n" % ( action, data["count"], expected, ) def process_output(self, data): rv = [] pid = data["process"] if pid.isdigit(): pid = "pid:%s" % pid pid = self.color_formatter.pid(pid) if "command" in data and data["process"] not in self._known_pids: self._known_pids.add(data["process"]) rv.append("%s Full command: %s" % (pid, data["command"])) rv.append("%s %s" % (pid, data["data"])) return "\n".join(rv) def crash(self, data): test = self._get_test_id(data) if data.get("stackwalk_returncode", 0) != 0 and not data.get( "stackwalk_stderr" ): success = True else: success = False rv = [ "pid:%s. Process type: %s. Test:%s. Minidump analysed:%s. Signature:[%s]" % ( data.get("pid", "unknown"), data.get("process_type", None), test, success, data["signature"], ) ] if data.get("java_stack"): rv.append("Java exception: %s" % data["java_stack"]) else: if data.get("reason"): rv.append("Mozilla crash reason: %s" % data["reason"]) if data.get("minidump_path"): rv.append("Crash dump filename: %s" % data["minidump_path"]) if data.get("stackwalk_returncode", 0) != 0: rv.append( "minidump-stackwalk exited with return code %d" % data["stackwalk_returncode"] ) if data.get("stackwalk_stderr"): rv.append("stderr from minidump-stackwalk:") rv.append(data["stackwalk_stderr"]) elif data.get("stackwalk_stdout"): rv.append(data["stackwalk_stdout"]) if data.get("stackwalk_errors"): rv.extend(data.get("stackwalk_errors")) rv = "\n".join(rv) if not rv[-1] == "\n": rv += "\n" action = self.color_formatter.action(data["action"].upper()) return "%s: %s" % (action, rv) def process_start(self, data): rv = "Started process `%s`" % data["process"] desc = data.get("command") if desc: rv = "%s (%s)" % (rv, desc) return rv def process_exit(self, data): return "%s: %s" % (data["process"], strstatus(data["exitcode"])) def log(self, data): level = data.get("level").upper() if level in ("CRITICAL", "ERROR"): level = self.color_formatter.error(level) elif level == "WARNING": level = self.color_formatter.warning(level) elif level == "INFO": level = self.color_formatter.log_process_output(level) if data.get("component"): rv = " ".join([data["component"], level, data["message"]]) else: rv = "%s %s" % (level, data["message"]) if "stack" in data: rv += "\n%s" % data["stack"] return rv def lint(self, data): fmt = ( "{path} {c1}{lineno}{column} {c2}{level}{normal} {message}" " {c1}{rule}({linter}){normal}" ) message = fmt.format( path=data["path"], normal=self.color_formatter.normal, c1=self.color_formatter.grey, c2=self.color_formatter.error if data["level"] == "error" else (self.color_formatter.log_test_status_fail), lineno=str(data["lineno"]), column=(":" + str(data["column"])) if data.get("column") else "", level=data["level"], message=data["message"], rule="{} ".format(data["rule"]) if data.get("rule") else "", linter=data["linter"].lower() if data.get("linter") else "", ) return message def shutdown(self, data): if not self.summary_on_shutdown: return heading = "Overall Summary" rv = [ "", self.color_formatter.heading(heading), self.color_formatter.heading("=" * len(heading)), ] for suite, summary in self.summary: rv.append(self._format_suite_summary(suite, summary)) return "\n".join(rv) def _get_subtest_data(self, data): test = self._get_test_id(data) return self.status_buffer.get(test, {"count": 0, "unexpected": 0, "pass": 0}) def _time(self, data): entry_time = data["time"] if self.write_interval and self.last_time is not None: t = entry_time - self.last_time self.last_time = entry_time else: t = entry_time - self.start_time return t / 1000.0