#!/usr/bin/env python3 # Release: Tue Jul 29 04:24:20 PM EDT 2025 """Testy: run test cases on command line programs""" __version__ = "0.8.1" ### Imports import sys # argv etc from io import StringIO # StringIO import os # directly do fork() / exec(), avoid subprocess import signal # for SIGKILL import os.path # joining output file paths import re # regexes import argparse # command args import shlex # string split respecting quoting import shutil # which, file copying, etc. import time # time.monotonic() import codecs # characeter coding translattions import select # poll from select import POLLIN,POLLOUT,POLLHUP,POLLERR,POLLNVAL from difflib import SequenceMatcher # diffing actual/expect output import logging as log # for loggers / debug messages import multiprocessing as multiproc import subprocess ################################################################################ ### Module level initialization LOGFORMAT = "==%(process)d %(levelname)s== [%(filename)s:%(lineno)d %(funcName)s] %(message)s" """Format for debug messages""" # Register translation mechanisms for bad ASCII characters in test # output. These are passed as the 'errors' argument to Popen() when # ASCII encoding is in use for output. codecs.register_error('as_question_marks', lambda e: ('?',e.start + 1)) codecs.register_error('as_nonascii', lambda e: (':nonascii:',e.start + 1)) # default valgrind program VALGRIND_ERROR_CODE = 13 """Error code that valgrind is requested to return on finding errrors""" VALGRIND_PROG = [ "valgrind", f"--error-exitcode={VALGRIND_ERROR_CODE}", "--leak-check=full", "--show-leak-kinds=all", "--track-origins=yes", ] """valgrind invocation broken down by options""" STDBUF_PROG = ["stdbuf","-i","0","-o","0","-e","0"] """stdbuf invocation broken down by options""" SIGNAL_DESCRIPTIONS = { # may wish to use the sys.SIGxx versions of the numbers 1 : " : controlling terminal has closed", 2 : " : interrupted through user interaction", 3 : " : user-triggered quit with core dump requested", 4 : " : invalid or illegal instruction executed", 6 : " : process self-terminated abnormally through the abort() function", 7 : " : BUS ERROR while attempting to access a memory address that is ill-formatted", 8 : " : arithmetic error, usually INTEGER DIVISION BY 0", 9 : " : killed by explicit request, usually initiated by user possibly due to timeouts / excessive output", 10 : " : user-defined signal", 11 : " : SEGMENTATION FAULT, memory problem such as out-of-bounds access", 12 : " : user-defined signal", 13 : " : communication through a pipe that was cut off", 14 : " : alarm expired in program", 15 : " : termination explicitly requested, usually initiated by user", } """Dict of signal numbers to descriptions""" ENVVARS_HONORED = { "show", # merged all of show, showfail, single_show_fail to this "overall_result_file", "results_dir", "save_rawfiles", "timeout", "parallel", "diff_sym_equal", "diff_ignore_whitespace", "diff_ignore_blanklines", } """Set of environment variables that are checked during parsing and honored if present as though they appeared after top-level options in the test file. Names are Case-insesnsitive: all are lower-cased when evaluated. """ ################################################################################ ### Data Types associated with tests class Suite: """Encapsulates a collection of tests""" def __init__(self): """Initialize a default empty Suite""" # these fields are set during parsing / initialization, most are options governing behavior self.tests = {} # tests in the suite in a dictioary with numeric keys starting at 1; first test is "test 1" self.tests_torun = [] # tests in the suite to run, set after parsing self.test_opts = {} # key/val options affecting tests and sessions set in the preamble self.title = None # title of the suite from the #+title: directive, None will use the file name self.description = "" # description in preamble that aren't part of any test self.preamble = "" # includes all premble text for regeneration self.use_points = False # True if points should be used self.points_possible = 0 # total possible points if points are in use, set during parsing self.points_scale = 1.0 # Set to a float scaling factor points should be scaled (e.t. 0.5 to halve everything) self.show = "singlefail" # Whether to print failures in the terminal, ["all","fail","singlefail" ,"none"="False"=*] self.filename = None # filename from which the suite came from or None if not from a file self.overall_result_file = False # True produce an overall result file self.parallel = False # Set to int to use multiple procs or True/auto for max procs self.regen = False # whether or not the suite is being regenerated # these fields are set during/after the suite evaluation self.passed_tests = 0 # total tests passed self.points_earned = 0 # total points earned among all tests # these fields are set during formatting self.result = None # formatted result for test, usually a string but may be anythingthe formatter finds usesful self.result_filename = None # file where result for the test is stored, None if no file has been created def setup_regen(self): """Toggle the regen field of thie suite and its tests to True so that regenerated test results can be produced""" self.regen = True for t in self.tests.values(): t.regen = True def run(self, progress_reporter, result_formatter): """Run tests in this suite that are requested. Fills in the result field of the suites and tests that are run as well as generating output files if fields in the suite/test indicate to do so. """ if self.parallel: # parallel execution requested, dispatch to that method self.run_parallel(progress_reporter, result_formatter) return progress_reporter.report_suite_start(self) # otherwise do serial execution for test in self.tests_torun: # reach each test requested, format results test.run() result_formatter.add_test_result(test) result_formatter.make_test_result_file(test) result_formatter.make_test_rawfiles(test) progress_reporter.report_test_done(test) if test.passed: self.passed_tests += 1 self.points_earned += test.points self.points_possible *= self.points_scale self.points_earned *= self.points_scale result_formatter.add_suite_result(self) result_formatter.make_suite_result_file(self) progress_reporter.report_suite_done(self) def get_proc_pool(self): """Create a pool of processes for multiple test processing Honors the self.parallel which may be an integer specifying the numbe of processes or the string "AUTO" which will select a number of processes equal to the total CPUs available. If it is False, use pool size 1. """ pool_size = 1 if type(self.parallel) == int: pool_size = self.parallel elif self.parallel in [True,'yes','YES','auto','AUTO','max','MAX']: pool_size = os.cpu_count() log.debug(f"parallel is {self.parallel}, setting pool_size to {pool_size}") return multiproc.Pool(pool_size) def run_parallel(self, progress_reporter, result_formatter): """Run tests in parallel using multiple proceses Uses proces pool produced by get_proc_pool() to execute tests in multiple processes speeding up execution in multi-CPU environments. Processes usually provide better speedup than threads due to Python's poor thread support. """ # NOTES: imap() is preferred as alternative as it returns # individual elements immediately on being available while map() # and starmap() wait for completion of all elements. imap() only # support a 1-arg function for mapping; overcome this by passing a # 2-tuple of test/formatter which allows the test to be formatted # on completion. The test spat out is received via IPC so is # distinct from the test passed in; copy the received and # completed test over to this process to ensure the data is # retained. progress_reporter.report_suite_start(self) args = [(test,result_formatter) # package test/formatter for test in self.tests_torun] self.tests_torun = [] # must rebuild this list due to parallel ops with self.get_proc_pool() as pool: for test in pool.imap(run_test, args, 1): # imap for lazy returns self.tests[test.testnum] = test # copy test over self.tests_torun.append(test) # rebuild torun list progress_reporter.report_test_done(test) if test.passed: self.passed_tests += 1 self.points_earned += test.points self.points_possible *= self.points_scale self.points_earned *= self.points_scale result_formatter.add_suite_result(self) result_formatter.make_suite_result_file(self) progress_reporter.report_suite_done(self) def honor_envvars(self): """Check environment to propagate honored variables to suite/tests This method is called during parsing AFTER reading the suite preamble is processed so that environment variables override premable options in the Suite. Variables that directly match suite field names are set to the indicated value while any others are added to the test_opts[] list to be propogated to tests. """ env = {k.lower() : v for k,v in os.environ.items()} # lowercase environment variables for var in ENVVARS_HONORED: var = var.lower() if var in env: val = env[var] if val != "all": # hack to avoid a built-in used for show try: # try converting to internal python val = eval(val) except Exception: pass # if it fails, leave as a string log.debug(f"{var}={val} option set from environment") if var in self.__dict__: self.__dict__[var] = val # override suite option else: self.test_opts[var] = val # override test/segment option def purge_bash_funcs_envvars(self): """Removes exported bash funcs from environment variables. Environment variables matching teh pattern 'BASH_FUNC_...%%' are exported BASH shell functions and are removed from the environment as when they propogate to the a `bash -v` call, they are usually echoed which corrupts test behavior. Often Unix/Linux systems that use the 'module' software system have these. """ for key in list(os.environ.keys()): if key.startswith("BASH_FUNC_") and key.endswith("%%"): os.environ.pop(key) class Test: """Encapsulates a single test which may have multiple segments""" def __init__(self,options=None): """Initialize a default empty Test""" # these fields are set during parsing / initialization self.title = None # title of the test self.linenum = None # line number on which test starts self.filename = None # file that test originated from self.testnum = None # number of the test among the suite (1-indexed) self.points = 1.0 # number of points assigned, float possible self.segments = [] # list of segments that comprise the self.test_directory = None # directory to use for this test or None if run in the working directory self.description = "" # description of test absent # comments self.preamble = "" # includes # comments, #+TESTY: directives, blank lines self.make_result_file = True # True produces individual test result files self.prefix = "test" # file prefix for result output files if any are created self.results_dir = "test-results" # base directory for test results files self.results_dir_early = False # whether to create results_dir before running the test self.save_rawfiles = False # whether to retain any raw input/output files in the raw/ directory self.raw_dir = "raw" # location of raw output files such as Valgrind logs, relative to results_dir (e.g. test-results/raw) self.regen = False # whether or not the test result is being regenerated; True means run all segmenets # these fields are set during/after the test evaluation self.passed = None # True for pass, False for fail, None for not run yet # these fields are set during formatting self.result = None # formatted result for test, usually a string but may be anythingthe formatter finds usesful self.result_filename = None # file where result for the test is stored, None if no file has been created # propagate fields from options to object if options: for key,val in options.items(): if key in self.__dict__: self.__dict__[key] = val def run(self): """Run this test to see if it it passes""" log.debug(f"running test number {self.testnum} : {self.title}") if self.results_dir_early: # create results directory early if requested subprocess.run(['mkdir','-p',self.results_dir],check=True) self.passed = True workdir = os.getcwd() if self.test_directory: log.debug(f"changing to requested test directory: {self.test_directory}") subprocess.run(['rm','-rf',self.test_directory],check=True) subprocess.run(['mkdir','-p',self.test_directory],check=True) os.chdir(self.test_directory) for (segi,seg) in enumerate(self.segments): log.debug(f"starting segment {segi}: {seg.title}") seg.run() if not seg.passed: # test segement failed self.passed = False # mark test as failed if not self.regen: # not regenerated so quit the test; during regen all segments are run break if self.test_directory: log.debug(f"returning to workdir: {workdir}") os.chdir(workdir) class Segment: """Encapsulate a segment of a test Segments run a particular program and check that its output and behavior match an expecation. They comprise some preamble / comments followed by a session which shows a transcript of what is to be done in the test along with its output. """ def __init__(self,options=None): """Initialize a default empty test Segment""" # these fields are obtained from parsing / initializing the segment self.title = None # title for segments self.linenum = None # line number on which segment starts self.description = "" # description of segment absent # comments self.preamble = "" # includes # comments, #+TESTY: directives, blank lines self.shell_precommands = [] # list of shell commands to be run prior to running the segment self.program = "bash -v" # program to run for the segment; special behavior for QUOTE and COMMENT self.prompt = ">>" # prompt string used by the program being run self.echoing = "input" # style of echoing done by the program, "input" echoes input, "both" for prompt+input echoing self.session = "" # string with lines of the session of input/output to be used self.use_valgrind = False # whether to run program under valgrind self.valgrind_reachable = True # whether to count reachable memory as an error in valgrind; usually reachable memory is fopen() with no fclose() self.valgrind_opts = "" # additional options to pass to valgrind such as suppression self.use_stdbuf = True # whether to run program under stdbuf program to eliminate I/O buffering self.skip_diff = False # True if diffing the expect/actual should be skipped self.skip_exitcode = False # True if the expected exit code should be checked (e.g. not trigger a failure) self.exitcode_expect = 0 # expected exit code from program self.force_ascii_output = True # translate non-ascii characters in test results to ascii self.timeout = 5.0 # timeout in (fractional) second to be used before segment is killed self.max_out_bytes = 2**20 # maximum size of output from tested program, kill if this is exceeded self.infinite_diff_limit = 100 # limit diff output to this many entries on a timeout / maxout self.post_filter = None # filter to run on output after test completes, before verifying output self.test_directory = None # directory to use for this test or None if run in the working directory self.diff_ignore_whitespace = True # Whitespace on a line is treated as single spaces when diffing self.diff_ignore_blanklines = True # Blank lines do not affect diff results self.diff_ignore_trail_ws = True # Any trailing whitespace is stripped / ignored when doing diffs self.diff_sym_equal = "." # Symbol to use in side-by-side diff for equal lines # these fields are set after the segment is run self.full_program = [] # full program invocation, may be decorated (usually with valgrind call) self.input_str = "" # string of input extracted from session self.output_expect = "" # expected output extracted from session self.pid = None # process ID of child process self.output_actual = None # output program actually produces self.output_original = None # original program output prior to applying post_filter, None if no post filter is used self.output_valgrind = None # output from Valgrind when in use self.sbs_diff = None # side-by-side diff self.sbs_diff_key = None # string showing meaning of symbols in side-by-side diff self.lbl_diff = None # line-by-line diff self.diff_passed = None # True if the diff passed, false otherwise self.exitcode = None # exit code of process that was run self.messages = [] # list of string messages indicating failures encountered self.timed_out = None # True if segment timed out during run self.maxed_out = None # True if segment produced more output than max_output_bytes self.passed = None # True for pass, False for fail, None for not run yet # propagate fields from options to object for key,val in options.items(): if key in self.__dict__: self.__dict__[key] = val def run(self): """Run an individual segment""" # Aiming to avoid any directory creation, file creation, etc. workdir = os.getcwd() if self.test_directory: log.debug(f"changing to requested test directory: {self.test_directory}") subprocess.run(['rm','-rf',self.test_directory],check=True) subprocess.run(['mkdir','-p',self.test_directory],check=True) os.chdir(self.test_directory) self.prerun_setup() # set up pending fields for the run if self.passed is False: # already failed in setup log.debug(f"failed test during prerun_setup()") return if self.program == "COMMENT": # special case: commented segment log.debug(f"automatic pass for commented segment") self.passed = True # always passes immediately return elif self.program.startswith("QUOTE"): # special case of quoted file as quote_file = self.program.split()[1] # #+BEGIN_QUOTE filename.txt log.debug(f"creating quoted file '{quote_file}'") with open(quote_file,"w") as out: # creates the file and populates its out.write(self.session) # contents; always passes self.passed = True return # GENERAL CASE of a program that should be run and output checked sys.stdout.flush() # flush pending output prior to fork() sys.stderr.flush() # to prevent duplicate output in parent/child (to_child_r,to_child_w) = os.pipe() # for communication between parent and child (fr_child_r,fr_child_w) = os.pipe() log.debug(f"to_child_r: {to_child_r} {os.get_inheritable(to_child_r)} to_child_w: {to_child_w} {os.get_inheritable(to_child_w)}") log.debug(f"fr_child_r: {fr_child_r} {os.get_inheritable(fr_child_r)} fr_child_w: {fr_child_w} {os.get_inheritable(fr_child_w)}") self.pid = os.fork() # create child process # CHILD PROCESS if self.pid == 0: os.close(to_child_w) os.close(fr_child_r) # NOTE: intentionally using constants 0,1,2 below. Previously # had calls like sys.stdin.fileno() BUT Python multiprocessing # messes with these may yield and they may be something other # than 0,1,2 that the child process will expect. This can lead # to weird child process behavior such as inability to send # input to the child. Thus, the constants 0,1,2 for stdin, # stdout, stderr file descriptor entries. os.dup2(to_child_r, 0) # redirect stdin, stdout, stderr to pipes os.dup2(fr_child_w, 1) # then execute a child process os.dup2(fr_child_w, 2) os.execvp(self.full_program[0],self.full_program) # child process does not return # PARENT PROCESS os.close(fr_child_w) os.close(to_child_r) log.debug(f"started program PID {self.pid}: [{' '.join(self.full_program)}]") (stdout_bytes,timed_out,maxed_out) = \ limited_communicate(to_child_w, # limit time/bytes for completion fr_child_r, self.input_str, timeout=self.timeout, max_out_bytes=self.max_out_bytes) if self.force_ascii_output: # possibly transform output to ascii out_encoding = 'ascii' out_errors = 'as_question_marks' else: out_encoding = 'utf-8' out_errors = 'as_question_marks' stdout_str = stdout_bytes.decode(encoding=out_encoding, errors=out_errors) if timed_out or maxed_out: # check for error conditions log.debug(f"timed out: {timed_out} / maxed_out: {maxed_out}, killing {self.pid}") os.kill(self.pid, signal.SIGKILL) # misbehaving, kill it log.debug(f"waiting on child process {self.pid}") (pid,waitstatus) = os.waitpid(self.pid, 0) # should return almost immediately log.debug(f"wait complete for child process {self.pid}, returned ({pid},{waitstatus})") # NOTE: using local version of this function as older versions of # python do not have it; replace with equivalent os module # function at a later date self.exitcode = testy_waitstatus_to_exitcode(waitstatus) # self.retcode = os.waitstatus_to_exitcode(waitstatus) if self.exitcode<0: # if signalled, append a message to output indicating as much signum = -self.exitcode sigdesc = SIGNAL_DESCRIPTIONS.get(signum,"") stdout_str = f'{stdout_str}\nSignal {signum} {sigdesc}' self.timed_out = timed_out # store info on ending status of process self.maxed_out = maxed_out self.output_original = stdout_str self.output_actual = stdout_str self.post_run_setup() # finalize remaining fields if self.test_directory: log.debug(f"returning to workdir: {workdir}") os.chdir(workdir) return def prerun_setup(self): """Completes any internal setup name before running""" self.set_full_program() self.set_input_str() self.set_output_expect() self.run_shell_precommands() def post_run_setup(self): """Finalize fields which can be set after the run finishes""" self.separate_valgrind_output() self.add_prompt_to_output() self.run_post_filter() self.diff_output() self.check_passed() ### Pre-run functions def add_prompt_to_output(self): """Modify output_actual to include the prompt on appropriate lines""" if self.echoing!="input": # method only works input echoing is enabled return # inappropriate for "both" echoing input_lines = self.input_str.splitlines() inpos = 0 with StringIO() as prompt_lines: for outline in self.output_actual.splitlines(): if inpos < len(input_lines) and outline==input_lines[inpos]: outline = f"{self.prompt} {outline}" inpos += 1 writeline(prompt_lines,outline) self.output_actual = prompt_lines.getvalue() def set_input_str(self): """Creates input_str by extracting prompt lines from session""" prompt_len = len(self.prompt) with StringIO() as instr: for line in self.session.splitlines(): if line.startswith(self.prompt): writeline(instr, line[prompt_len:].lstrip()) if line.startswith("#+TESTY_EOF"): break self.input_str = instr.getvalue() def set_output_expect(self): """Sets the expected output based on the session""" with StringIO() as out: for line in self.session.splitlines(): if not line.startswith("#+TESTY_EOF"): # exclude lines that signal end of input writeline(out,line) self.output_expect = out.getvalue() def set_full_program(self): """Sets full_program field which may include valgrind""" self.full_program = [] # set up the full arg list for the segment if self.use_stdbuf: # stdbuf first as valgrind output is truncated otherwise if not shutil.which("stdbuf"): self.passed = False self.messages.append("stdbuf not found for test that has use_stdbuf=1") return self.full_program.extend(STDBUF_PROG) if self.use_valgrind: # valgrind next if it is in use if not shutil.which("valgrind"): self.passed = False self.messages.append("Valgrind not found for test that has use_valgrind=1") return self.full_program.extend(VALGRIND_PROG) # add base valgrind program and options self.full_program.extend(shlex.split(self.valgrind_opts)) self.full_program.extend(shlex.split(self.program)) # add on actual program def go_to_testdir(self): """Change into the testing directory""" def run_shell_precommands(self): """Execute specified shell commands prior to segment""" for cmd in self.shell_precommands: log.debug(f"running shell command [{cmd}]") subprocess.run(cmd, shell=True, check=True) # shell=True important to support pipes, etc. ### Post-run functions def diff_output(self): """Calculate the diff of the output""" alines = self.output_expect.splitlines() # compare expected and actual blines = self.output_actual.splitlines() # output for the diff aseq = alines bseq = blines if self.diff_ignore_whitespace: # all whitespace treated as 1 whitespace aseq = [re.subn(r"\s+"," ",x)[0] for x in aseq] # modify sequence elements to compare to bseq = [re.subn(r"\s+"," ",x)[0] for x in bseq] # facilitate matching if self.diff_ignore_trail_ws: aseq = [x.rstrip() for x in aseq] # strip all trailing whitespace to bseq = [x.rstrip() for x in bseq] # facilitate matching blank_func = lambda x: False if self.diff_ignore_blanklines: # mark blank lines as blank which blank_func = lambda x: x=="" # will allow for 0-cost gaps on them (align,score) = global_alignment(aseq,bseq,blank_func=blank_func) seq_equal = not any([x in {"insert","delete","replace"} # presence of any of these means for (_,_,x,_) in align]) # sequences are not equal self.diff_passed = seq_equal or self.skip_diff # skip_diff causes this to always pass diff_limit = None if len(align) > self.infinite_diff_limit and self.timed_out or self.maxed_out: diff_limit = self.infinite_diff_limit # last = align[-1] # likely the kill signal align = align[:diff_limit] align.append(last) # diff_limit = min(self.infinite_diff_limit,len(align)) # on a timeout / maxout, limit the number # if self.timed_out or self.maxed_out: # of diff entries as they are expected # align = align[:diff_limit] # to be repeating and uninformative self.sbs_diff = sbs_diff(align, alines, blines, # compute side-by-side diff string sym_equal=self.diff_sym_equal, titles=("===EXPECT===","===ACTUAL===")) self.sbs_diff_key = f"{self.diff_sym_equal} lines match; | lines differ; < expected line missing; > extra line in actual" self.lbl_diff = lbl_diff(align, alines, blines, titles=("EXPECT","ACTUAL")) if diff_limit: # append messages to indicate truncation self.sbs_diff = f"{self.sbs_diff}Diff Truncated after {diff_limit} entries due to timeout / maxout\n" self.lbl_diff = f"{self.lbl_diff}Diff Truncated after {diff_limit} entries due to timeout / maxout\n" def check_passed(self): """Determines if the segment passed according to fields set during testing""" self.passed = True if self.exitcode==VALGRIND_ERROR_CODE: self.passed = False self.messages.append("Valgrind Errors: Check Valgrind section for details") if self.use_valgrind and self.valgrind_reachable: # usually reachable memory is fopen() with no fclose(), common # error especially among students so chck for this reachable_match = re.match(r"still reachable: (\d+) bytes",self.output_valgrind) if reachable_match and reachable_match[1]!="0": self.passed = False self.messages.append("Valgrind found Reachable Memory, calls to free(), fclose(), or other de-allocation are needed") if self.timed_out: self.passed = False self.messages.append(f"Timed Out: execution exceeded {self.timeout} seconds. Check for Infinite loops") if self.maxed_out: self.passed = False limit = None if self.max_out_bytes <= 8*2**10 : # <= 8K show bytes limit = f"{self.max_out_bytes} bytes" elif self.max_out_bytes <= 256*2**10: # <= 256K show kilobytes limit = f"{self.max_out_bytes / 2**10:.2f} kilobytes" else: # show megabytes; don't expect gigabyte output limit = f"{self.max_out_bytes / 2**20:.2f} megabyte(s)" self.messages.append(f"Max Output Exceeded: execution produced more than {limit} of output. Check for Infinite loops") if self.exitcode < 0: self.passed = False signum = -self.exitcode sigdesc = SIGNAL_DESCRIPTIONS.get(signum,"") self.messages.append(f"Received signal {signum} {sigdesc}") elif self.skip_exitcode is False and self.exitcode != self.exitcode_expect: self.passed = False self.messages.append(f"Program had exit code {self.exitcode} when {self.exitcode_expect} is expected") if not self.diff_passed: self.passed = False self.messages.append("Output Differenes: Expected/Actual do not match, check Diff Sections for details") def separate_valgrind_output(self): """Separate valgrind output from output_actual. Populates the output_valgrind field with the lines that look like they are output from Valgrind. """ if not self.use_valgrind: return val_re = re.compile(f"=={self.pid}==" + r".*?\n") # matches valgrind output lines, technically should not need the ? # for non-greedy matching as Python REs do not match \n by default # but an leaving this in to try to express intent self.output_valgrind = "".join(re.findall(val_re,self.output_actual)) self.output_actual = re.sub(val_re,"",self.output_actual,count=0) def run_post_filter(self): """Run output_actual through a specified filter to modify it""" if not self.post_filter: return # older versions of python do not support the text/capture_output # parameters to subprocess.run() so work-arounds are used instr = bytes(self.output_actual,'utf-8') # apparently must feed intput as bytes... result = subprocess.run(self.post_filter, # subprocess for filter program input=instr, # input is program output stdout=subprocess.PIPE, # capture output via a pipe stderr=subprocess.STDOUT, # merge stderr and stdout shell=True, # run in a shell: no shell lex # capture_output=True, # ADDED: python 3.6 (save the output in the process) # text=True, # ADDED: python 3.6 (text only) check=True) # exception if filter fails self.output_actual = result.stdout.decode('utf-8') # return a normal string def set_bash_opts(self): """Reset segmeent to run as a default bash program""" self.program = "bash -v" # program to run for the session; special behavior for QUOTE and COMMENT self.prompt = ">>" # prompt string used by the program being run self.echoing = "input" # style of echoing done by the program, "input" echoes input, "both" for prompt+input echoing self.use_valgrind = False # whether to run program under valgrind self.valgrind_opts = "" # additional options to pass to valgrind such as suppression self.post_filter = None # filter to run on output after test completes, before verifying output self.test_directory = None # directory to use for this test or None if run in the working directory ################################################################################ ## End Segment class ################################################################################ ### Global Utility Functions def limited_communicate(to_fd,from_fd,to_str=None,timeout=None,max_out_bytes=None): """Communicate on a to/from pipe with limits Write to_str to the to_fd in chunks and receive data from from_fd in chunks. If this takes longer than timeout (fractional seconds), bail. If more than from_bytes are read, bail. Returns a tuple of (from_str, timed_out T/F, maxed_out T/F) Both to_fd and from_fd are closed by the end of this routine. Makes use of poll() under the hood to synchornously handle the I/O and avoid blocking for too long. """ poll_timeout_millis = 100 # timeout for poll calls block_size = 4096 # size of blocks of data to communicate pollset = select.poll() # set of file descriptors to track pollset.register(to_fd, POLLOUT) pollset.register(from_fd, POLLIN) from_bytes = bytearray() # blocks read from from_fd from_eof = False # reached tht end of from_fd total_time = 0.0 # total time elapsed in the loop to_pos = 0 # advancing position write in to_str beg_time = time.monotonic() # start time of main loop zero_read_count = 0 # number of times a read() of zero length occurs zero_read_limit = 5 # reaching this number of 0 read counts terminates the input loop to_bytes = None if to_str: to_bytes = bytes(to_str,'utf-8') (loop_count,write_count,read_count) = 0,0,0 # for debugging log.debug(f"communicate loop start: to_fd {to_fd} from_fd {from_fd} POLLIN {POLLIN} POLLOUT {POLLOUT} POLLERR {POLLERR} POLLHUP {POLLHUP}") while ((from_fd > 0) and # other from still has data (not timeout or total_time <= timeout) and # still within timeout (not max_out_bytes or len(from_bytes) <= max_out_bytes)): # still under max bytes read loop_count += 1 fileops = pollset.poll(poll_timeout_millis) log.debug(f"fileops: {fileops}") # handle 1 event per loop, slightly less efficient than multiple # events BUT handling the POLLIN even first and then re-polling # allows for easier checking of error conditions and ensures easy # handling of the POLLIN even before POLLHUP for (fd,event) in fileops: if (fd==to_fd and event&POLLOUT): # can write to other side if to_bytes and to_pos < len(to_bytes): # have some left to write, do so log.debug(f"to_fd {fd} ready; writing to it") write_count += 1 end_pos = min(len(to_bytes),to_pos+block_size) to_pos += os.write(to_fd,to_bytes[to_pos:end_pos]) else: # IMPORTANT: close outward stream log.debug(f"to_fd {fd} ready but end of output; closing it") pollset.unregister(to_fd) # when all data written so the os.close(to_fd) # other side knows it's done elif fd==from_fd and event&POLLIN: # can read from inward stream read_count += 1 block = os.read(from_fd,block_size) from_bytes.extend(block) log.debug(f"from_fd {fd} input ready, read {len(block)} bytes from it") if len(block)>0: # non-zero read length; reset count of zero_read_limit = 0 # zer-read length. A 0-read length else: # MAY indicate inward stream closed zero_read_count += 1 # Some UNIX platforms use POLLIN with a 0 if zero_read_count >= zero_read_limit: # read length to indicate EOF rather than from_eof = True # POLLHUP; count a few 0-len reads to check log.debug(f"from_fd {fd} 0-read count limit reached, closing") elif fd==from_fd and event&POLLHUP: # inward stream definitely closed, terminate from_eof = True # I/O loop log.debug(f"from_fd {fd} POLLHUP found, closing") os.close(from_fd) from_fd = -1 else: log.debug(f"Uknown event: fd {fd} even {event}") total_time = time.monotonic() - beg_time if from_fd > 0: # pipe still open so likely timeout/maxout os.close(from_fd) # close this side of the pipe log.debug(f"communicate loop done") log.debug(f"loop_count: {loop_count} write_count: {write_count} read_count: {read_count}") log.debug(f"total_time: {total_time:.4f} len(from_bytes): {len(from_bytes)}") return (from_bytes, timeout is not None and total_time > timeout, max_out_bytes is not None and len(from_bytes) > max_out_bytes) def global_alignment(seqA, seqB, scorefunc="RQR", score_min=-0.25, score_max=1.0, gap_cost=-0.1, blank_func=lambda x: False): """Compute global sequence alignmnet of lines in seqA and seqB Uses the Needlman-Wunsch algorithm (common to bioinformatics sequence alignment). This adaptation assumes seqA and seqB are lists of lines (strings) and the alignment is a diff-like aligmnet between the lines. Returns a pair of (matches,tot_score). tot_score is the overall score for the alignment with higher numbers meaning a "better" alignment. In the context of comparing expected/actual output, the number means very little and is not used. matches is a list of the form [(ai, bj, tag, score)...] where the ai,bj are the index of elements in seqA,seqB that match at that position, score is the score contritubed by that part of the match, and tag describes the alignment action, one of - "equal": seqA(ai) and seqB(bj) lines were equal and matched - "replace": seqA(ai) and seqB(bj) where matched with some differences - "delete": seqA(ai) doesn't match, should be deleted to transform to seqB - "insert": seqB(bj) doesn't match, should be inserted to transform from seqB scorefunc: The default options use a fast comparison of lines (SequenceMather.real_quick_ratio(None,lineA,lineB)) but by adjsting from RQR to QR or R the comparisons get more accurate at comparing lines at the cost of increasing the comutation time. For standard diff-like behavior, RQR with the other default parameters is pretty close. score_min and score_max scale the scoring of matches (equal/replace). The ratio()-based function used are normalized to 0.0-1.0 but are scaled to between score_min and score_max so that badly mismatched lines can be penalized in favor of gaps (insert/delete). gap_cost adjusts the cost of adding gaps (insert/delete) in the alignment. blank_func is function which determines when a line is blank; blank lines are treated specially and have 0 cost in insertion/deletion. The defaults for score_min, score_max, are set so that if lineA and lineB have no characters in common, the alignment will favor an insert/delete over a replace. """ scorefuncs = { # compare seq elements for scoring "RQR": lambda seqmatcher: seqmatcher.real_quick_ratio(), # fastest with least accuracy "QR" : lambda seqmatcher: seqmatcher.quick_ratio(), # fast with more accuracy "R" : lambda seqmatcher: seqmatcher.ratio(), # slow with high accuracy } match_score = scorefuncs.get(scorefunc,None) if not match_score: raise Exception("scorefunc '{scorefunc}' is not known, options are {scorefuncs.keys()}") score_scale = score_max-score_min ablank = [blank_func(x) for x in seqA] bblank = [blank_func(x) for x in seqB] (m,n) = (len(seqA), len(seqB)) # set up NW score/trace matrix scoremat = [[None]*(n+1) for _ in range(m+1)] # seqA along rows, seqB along cols for i in range(0, m + 1): # initialize matrices for opening gaps scoremat[i][0] = (gap_cost*i,(i-1,0,"delete",gap_cost)) for j in range(0, n + 1): scoremat[0][j] = (gap_cost*j,(0,j-1,"insert",gap_cost)) scoremat[0][0] = (0,(0,0,"done",0)) for i in range(1, m + 1): # initialize matrices for opening gaps if ablank[i-1]: scoremat[i][0] = (scoremat[i-1][0][0],(i-1,0,"delblank",0)) else: scoremat[i][0] = (scoremat[i-1][0][0]+gap_cost,(i-1,0,"delete",gap_cost)) for j in range(1, n + 1): if bblank[j-1]: scoremat[0][j] = (scoremat[0][j-1][0],(0,j-1,"insblank",0)) else: scoremat[0][j] = (scoremat[0][j-1][0]+gap_cost,(0,j-1,"insert",0)) # MAIN SCORING ALGORITHM for i in range(1, m+1): seqmatcher = SequenceMatcher(None,"",seqA[i-1]) # seqmatcher optimized for stable seq2, vary seq1 more frequentyly for j in range(1, n + 1): if seqA[i-1]==seqB[j-1]: # equal elems score a 1.0 (tag,raw_score) = ("equal",1.0) else: # unequal elems score in range -0.2 to 1.0 seqmatcher.set_seq1(seqB[j-1]) # use seqmatcher for comparison (tag,raw_score) = ("replace",match_score(seqmatcher)) mscore = raw_score*score_scale + score_min # scale the score in range scores = [(scoremat[i-1][j-1][0] + mscore,(i-1,j-1,tag,mscore)), # match seq1/2 elem (scoremat[i-1][j][0] + gap_cost,(i-1,j,"delete",gap_cost)), # delete seq1 elem (scoremat[i][j-1][0] + gap_cost,(i,j-1,"insert",gap_cost))] # insert seq2 elem if ablank[i-1]: scores.append((scoremat[i-1][j][0],(i-1,j,"delblank",0))) if bblank[j-1]: scores.append((scoremat[i][j-1][0],(i,j-1,"insblank",0))) scoremat[i][j] = max(scores) # TRACEBACK COMPUTATION align_score = scoremat[-1][-1][0] # overall score of the global alignmnet align = [] # traceback alignment, elements (seq1,seq2,tag,cost) i,j = m,n while i > 0 or j > 0: (_,info) = scoremat[i][j] align.append(info) i=info[0] j=info[1] align.reverse() # reverse the alignment in place return (align, align_score) def sbs_diff(align,alines,blines, titles=(None,None), sym_equal=' ',sym_replace='|', sym_delete='<',sym_insert='>', sym_delblank='(', sym_insblank=')'): """Create a side-by-side diff string from an alignment""" awidth = max((len(x) for x in alines), default=0) bwidth = max((len(x) for x in blines), default=0) if titles != (None,None): awidth = max(awidth,len(titles[0])) bwidth = max(bwidth,len(titles[1])) with StringIO() as diff: if titles != (None,None): writeline(diff,f'{titles[0]:{awidth}} {titles[1]}') for (ai,bj,tag,_) in align: if tag=="delete": writeline(diff,f"{alines[ai]:{awidth}} {sym_delete} {''}") elif tag=="delblank": writeline(diff,f"{alines[ai]:{awidth}} {sym_delblank} {''}") elif tag=="insert": writeline(diff,f"{'':{awidth}} {sym_insert} {blines[bj]}") elif tag=="insblank": writeline(diff,f"{'':{awidth}} {sym_insblank} {blines[bj]}") else: sym = sym_equal if tag=="equal" else sym_replace writeline(diff,f"{alines[ai]:{awidth}} {sym} {blines[bj]}") return diff.getvalue() def lbl_diff(align,alines,blines, titles=("OLD","NEW")): """Create a line-by-line diff showing only non-matching lines from alignment The comparison is literally line-by-line rather than block-by-block as is the case """ (atit,btit) = (titles[0],titles[1]) twidth = max(len(atit),len(btit)) with StringIO() as diff: for (ai,bj,tag,_) in align: # ignore equal, ins/del blank as these if tag=="delete": # are not the source of failures writeline(diff,f"{atit:<{twidth}} {ai+1:>3d}) {alines[ai]}") writeline(diff,f"{btit:<{twidth}} {'--':>3}) ") elif tag=="insert": writeline(diff,f"{atit:<{twidth}} {'--':>3}) ") writeline(diff,f"{btit:<{twidth}} {bj+1:>3d}) {blines[bj]}") elif tag=="replace": writeline(diff,f"{atit:<{twidth}} {ai+1:>3d}) {alines[ai]}") writeline(diff,f"{btit:<{twidth}} {bj+1:>3d}) {blines[bj]}") if tag in {'delete','insert','replace'}: writeline(diff,'') return diff.getvalue() def get_keyval(string,localenv): """For 'key=val', returns (key,val) Accept a string of the form 'key=val' and separate the key/value pair. Removes whitespace if needed around begining/end of string. Raises an exception if there is a formatting problem. """ string = string.strip() if "=" not in string: raise ParseError(f"key=value string [{string}] is not formatted correctly") (key,val)=string.split("=",1) key = key.lower() # all lower case keys try: val = eval(val,localenv) except Exception as e: raise ParseError(f"[{string}] is not valid syntax for right-hand side") from e return (key,val) def slurp(filename): """Read an entire file into memory""" with open(filename,encoding='utf-8') as f: return str(f.read()) def shave_blanks(line_list): """Elimnate whitespace-only strings from beginning/end of list""" # NOTE: for strings, can use the trim() function which would be # preferred to this beg = 0 for line in line_list: if not re.fullmatch(r"\s*",line): break beg += 1 end = len(line_list) for line in reversed(line_list): if not re.fullmatch(r"\s*",line): break end -= 1 return line_list[beg:end] def writeline(file,line,end='\n'): """Write line of text to file output; reverses file/text order in call for nicer formatting.""" print(line,file=file,end=end) def run_test(arg): """Helper for multiprocess mapping to run a test Accept (test,result_formatter) and run the test then use the formatter to format the results. Global helper required due to limitations in Python's API for multiprocessing and limits in scoping/pickling. Used with imap() in multiprocessing so returns the completed test. """ (test, result_formatter) = arg # destructure arg test.run() # run test and result_formatter.add_test_result(test) # format output result_formatter.make_test_result_file(test) result_formatter.make_test_rawfiles(test) return test # return completed test def testy_waitstatus_to_exitcode(ws): """Converts a wait status to the exit code of a program or negative number if the program was signalled. This is a compatibility function for Python ver<3.9 when this function was added to the os module as os.waitstatus_to_exitcode(w). The below implementation follows the C code in cpython/Modules/posixmodule.c to produce similar effects.""" if os.WIFEXITED(ws): return os.WEXITSTATUS(ws) elif os.WIFSIGNALED(ws): return -os.WTERMSIG(ws) elif os.WIFSTOPPED(ws): raise ValueError(f"process stopped by delivery of signal {os.WSTOPSIG(ws)}") else: raise ValueError(f"invalid wait status: {ws}") ################################################################################ ### Parsing classes and hierarchy class ParseError(Exception): """Exception to throw when a parsing error occurred""" class FilePos: """Encodes a file position for error reporting""" def __init__(self, fname): self.filename = fname self.linenum = 0 class SuiteParser: """Interface for test file parsers. Represents shared functionality of parsers. Implementing classes should override parse_file(filename) which will open a given file, parse it, and return a Suite """ def parse_file(self,filename): """Parse a file (abstract method) To be overriden by derived classes. Opens and reads the contents of filename and returns a Suite read from it. """ def regen_file(self, fname, suite): """Output suite to file as expected test results Write the results present in suite to fname such that they actual output is treated as the expected output and tests can be rerun to pass. """ # other shared methods built from parse_file can go here such as # parsing_string which can simply convert the given string to a # StringIO and then call parse_file class OrgSuiteParser: """Handle Emacs Org formatted test files""" def parse_file(self,filename): """Parse an Emacs Org formatted files Org files are the traditional format to create readable, compact test files. This function parses an Org file and builds a Suite from it. """ contents = slurp(filename) test_regex = re.compile(r"(^\* .*\n)",re.MULTILINE) # split content into tests based tc_list = re.split(test_regex,contents) # on test headers tc_len = len(tc_list) if tc_len <= 1: msg = f"{filename} does not contain any tests" raise ParseError(msg) preamble = tc_list[0] # everything before first '* title' test_titles_contents = \ [(t,c) for (t,c) # title/content together for iteration in zip(tc_list[1:tc_len:2], tc_list[2:tc_len:2]) if not t.startswith('* COMMENT')] # filter commented tests filepos = FilePos(filename) # track global file position try: # try block for parsing errors suite = self.parse_suite_preamble(preamble,filepos) for (tidx,(test_title,test_content)) in enumerate(test_titles_contents): testnum=tidx+1 test = self.parse_test(test_title, test_content, suite.test_opts, filepos) test.testnum = testnum suite.points_possible += test.points suite.tests[testnum] = test except ParseError as e: msg = f"{filepos.filename}:{filepos.linenum}: {str(e)}" raise ParseError(msg) from e # decorate parsing errors position except ValueError as e: msg = f"{filepos.filename}:{filepos.linenum}: {str(e)}" raise ParseError(msg) from e # decorate parsing errors position suite.filename = filename return suite def parse_suite_preamble(self, preamble, filepos): """Preamble parsing preceding the first test of org file tests""" suite = Suite() suite.preamble = preamble with StringIO() as desc: for line in preamble.splitlines(): filepos.linenum += 1 # track line number for error reporting (first,rest) = ("",line) if " " in line: (first, rest) = line.split(" ",1) # extract the first token on the line first = first.upper() # upper case for case insensitive matching if first == "#+TITLE:": # title as in [#+TITLE: Tests for blather] suite.title = rest elif first == "#+TESTY:": # option directive like [#+TESTY: program='bc -iq'] (key,val) = get_keyval(rest,locals()) # raises an exception if badly formatted if key in suite.__dict__: suite.__dict__[key] = val # python objects are dicts, exploit this to assign the value log.debug(f"[{key}={val}] suite option set in file preamble") else: suite.test_opts[key] = val log.debug(f"[{key}={val}] test/segment option set in file preamble") elif len(line)>0 and line[0]!="#": # non org-comments added to description writeline(desc,line) suite.description = desc.getvalue().strip() suite.purge_bash_funcs_envvars() suite.honor_envvars() # override premable opts with environment vars return suite def parse_test_preamble(self, test, preamble, filepos): """Preamble parsing preceding the first segement of org file test""" test.preamble = preamble with StringIO() as desc: for line in preamble.splitlines(): filepos.linenum += 1 # track line number for error reporting (first,rest) = ("",line) if " " in line: (first, rest) = line.split(" ",1) # extract the first token on the line first = first.upper() # upper case for case insensitive matching if first == "#+TESTY:": # option directive like [#+TESTY: program='bc -iq'] (key,val) = get_keyval(rest,locals()) # raises an exception if badly formatted if key in test.__dict__: test.__dict__[key] = val # python objects are dicts, exploit this to assign the value log.debug(f"[{key}={val}] test option set in test preamble ({filepos.filename}:{filepos.linenum})") else: # don't propogate test options down to segments msg = f"{filepos.filename}:{filepos.linenum}: Invalid test option {key}={val}" raise ParseError(msg) elif len(line)>0 and line[0]!="#": # non org-comments added to description writeline(desc,line) test.description = desc.getvalue().strip() return def parse_test(self,test_title,content,opts,filepos): """Parse a single test from org test file""" test = Test(opts) test.title = test_title[2:-1] # remove "* " and newline filepos.linenum += 1 test.linenum = filepos.linenum test.filename = filepos.filename # propagate_fields(test,opts) # TODO Handle :PROPERTIES: drawer here segstart_regex = re.compile(r"^\*\* ",re.MULTILINE) segstart = re.search(segstart_regex,content) if segstart: preamble = content[:segstart.start(0)] # extract the preamble and parse it self.parse_test_preamble(test,preamble, filepos) # content = content[segstart.start(0):] # remove preamble from test body content seg_regex = re.compile(r"^#\+(?:END_SRC|END_QUOTE).*\n", # split on ending tokens for re.MULTILINE | re.IGNORECASE) # segments; note that a line seg_contents = re.split(seg_regex, content) # is excised from resulting list for segc in seg_contents: if re.fullmatch(r"\s*",segc): # completely blank region filepos.linenum += segc.count("\n") # add line count and continue # advance segment = self.parse_segment(segc,opts,filepos) if hasattr(segment,"points"): # hack to convey points into test test.points = segment.points test.segments.append(segment) filepos.linenum += 1 # add excised line return test def parse_segment(self,content,opts,filepos): """Parse a single segment from org test file""" # TODO: Handle TESTY_RERUN directive somehow... segment = Segment(opts) # propagate_fields(segment,opts) segment.linenum = filepos.linenum+1 # first line of segment is next seg_regexs = r"(^#\+(?:BEGIN_SRC|BEGIN_QUOTE).*\n)" # split content into premable/session; seg_regex = re.compile(seg_regexs, # retain the SRC/QUOTE as it is re.MULTILINE|re.IGNORECASE) if re.search(seg_regex, content) == None: (preamble,token1,session) = (content,"COMMENT","") # handle cases where text trails session else: # preamble followed by session token and body (preamble,token1,session) = re.split(seg_regex, content) segment.preamble = preamble with StringIO() as desc: # handle the preamble, comments and for line in preamble.splitlines(): # prior to the first test filepos.linenum += 1 (first,rest) = ("",line) if " " in line: (first, rest) = line.split(" ",1) # extract the first token on the line first = first.upper() # upper case for case insensitive matching if first=="#+TESTY:" and rest[0]=="!": # shell command as in [#+TESTY: !rm file.txt] segment.shell_precommands.append(rest[1:]) # peel off the ! at the start elif first == "#+TESTY:" and rest=='set_bash_opts': # reset to bash options log.debug(f"Setting segment to bash options") segment.set_bash_opts() elif first == "#+TESTY:": (key,val) = get_keyval(rest,locals()) # raises exception if badly formatted segment.__dict__[key] = val log.debug(f"[{key}={val}] segment option set in segment preamble") elif len(line)>0 and line[0]!="#": # non org-comments added to description writeline(desc,line) segment.description = desc.getvalue().strip() # eliminate beg/end blanks comments if token1 == "COMMENT": segment.program = "COMMENT" elif token1.upper().startswith("#+BEGIN_QUOTE"): # is a quote-based session to create quote_file = token1.split()[1] # a file with given contents segment.program = f"QUOTE {quote_file}" # overwrite program with that data filepos.linenum += 1 # accounts for #+BEGIN_QUOTE else: filepos.linenum += 1 # accounts for #+BEGIN_SRC segment.session = session filepos.linenum += session.count("\n") # update based on #lines in session return segment def regen_file(self, fname, suite): """Regenerate org test file based on results in suite Creates a new test file with all segments based on the output_actual of the given suite. Allows a solution program to produce expected results. """ with open(fname,'w') as out: writeline(out,suite.preamble,end='') for test in suite.tests_torun: # iterate over tests in suite title = test.title or 'Test' writeline(out,f'* {title}') writeline(out,test.preamble,end='') for seg in test.segments: # iterate over segments in test if len(test.segments) > 1 or seg.title: title = seg.title or 'Segment' writeline(out,f'** {title}') writeline(out,seg.preamble,end='') writeline(out,f'#+BEGIN_SRC sh') writeline(out,seg.output_actual,end='') # output_actual used during regen writeline(out,f'#+END_SRC') writeline(out,f'') # end seg loop # end test loop # end with file ################################################################################ ### Progress reporting class ProgressReporter: """Base class for reporting progress while tests run.""" def __init__(self,suite): """Initialize based on parsed suite.""" def report_suite_start(self,suite): """Print header information of the suite after parsing""" def report_test_done(self,test): """Print info on the completion of a test.""" def report_suite_done(self,suite): """Print ending information on a suite completing""" # END class ProgressReporter class LineProgressReporter(ProgressReporter): """Print one line for each test run""" def __init__(self,suite): """Currently this version calculates some widths to allow for nicer output""" self.testnum_width = max([len(f"{t.testnum}") for t in suite.tests_torun]) self.test_title_width = max([len(t.title) for t in suite.tests_torun]) if suite.filename is None and suite.title is None: self.header = f"== Unnamed Suite" elif suite.filename is not None and suite.title is None: self.header = f"== {suite.filename}" elif suite.filename is None and suite.title is not None: self.header = f"== {suite.title}" else: self.header = f"== {suite.filename} : {suite.title}" def report_suite_start(self,suite): """Print header information of the suite after parsing""" print("="*len(self.header)) print(self.header) print(f"== Running {len(suite.tests_torun)} / {len(suite.tests)} tests") def report_test_done(self,test): """This version prints one line per test with ok/FAIL. """ result_string = "ok" if test.passed is False: if test.result_filename is None: result_string = "FAIL" else: result_string = f"FAIL -> see {test.result_filename}" print(f"{test.testnum:{self.testnum_width}}) {test.title:{self.test_title_width}} : {result_string}") def report_suite_done(self,suite): """Prints out number of tests passed / points earned.""" # TODO: Add SHOW output here print("="*len(self.header)) if suite.use_points: passed = suite.points_earned possible = suite.points_possible print(f"RESULTS: {passed:.2f} / {possible:.2f} points ") else: passed = suite.passed_tests possible = len(suite.tests_torun) print(f"RESULTS: {passed} / {possible} tests passed") # END class LineProgressReporter class ShortProgressReporter(ProgressReporter): """Provides shorter form reporting of testing progress Each test passed is either a . for pass or a F for fail""" def __init__(self,suite): """Init tracking of test number printing""" self.tests_run = 0 self.total_tests = len(suite.tests_torun) def report_suite_start(self,suite): """Print header information of the suite after parsing""" print(f"Running {len(suite.tests_torun)} / {len(suite.tests)} tests") print("|",end="") for i in range(5,min(40,self.total_tests)+1,5): print(f"{i:5d}",end="") # print a numeric guide for tests print(f"{'':{self.total_tests%5}}|") print("|",end="",flush=True) def report_test_done(self,test): """Print . for pass and F for fail""" if test.passed: print(".",end="",flush=True) # . for pass else: print("F",end="",flush=True) # F for fails self.tests_run += 1 if self.tests_run == self.total_tests: # end the with vertical bar print("|") elif self.tests_run%40 == 0: # every 40 carry to another line print("|") print("|",flush=True) def report_suite_done(self,suite): """Print number of tests passed / points earned and failing test files""" if suite.use_points: passed = suite.points_earned possible = suite.points_possible print(f"RESULTS: {passed:.2f} / {possible:.2f} points ") else: passed = suite.passed_tests possible = len(suite.tests_torun) print(f"RESULTS: {passed} / {possible} tests passed") failed_tests = [t for t in suite.tests_torun if not t.passed] if failed_tests: testnum_width = max([len(f"{t.testnum}") for t in failed_tests]) test_title_width = max([len(t.title) for t in failed_tests]) for test in failed_tests: result_string = "FAIL" if test.result_filename: result_string = f"FAIL -> see {test.result_filename}" print(f"{test.testnum:{testnum_width}}) {test.title:{test_title_width}} : {result_string}") # END class ShortProgressReporter ################################################################################ ### Result Formatting class ResultFormatter: """Format results for each test / entire suite""" def __init__(self,suite): """Initialize any state needed based on the suite""" def add_test_result(self,test): """Add result field to the test after running it""" def make_test_result_file(self,test): """Create a result file for this individual test if indicated by its fields""" def add_suite_result(self,suite): """Add result field to the suite after running it""" def make_suite_result_file(self,test): """Create a result file for the entire suite""" def make_test_rawfiles(self,test): """Create raw output files if requested This method may be inherited by all formatters and used without modification """ if not test.save_rawfiles: return raw_dir = os.path.join(test.results_dir, test.raw_dir) subprocess.run(['mkdir','-p',raw_dir],check=True) raw_fields = ["output_actual","output_expect", "output_original", "input_str"] for (segi,seg) in enumerate(test.segments): for field in raw_fields: fieldstr = seg.__dict__[field] # fields like QUOTE files may be empty if fieldstr: # so filter those out fname = f"{test.prefix}-{test.testnum:02d}-s{segi:02d}-{field}.txt" with open(os.path.join(raw_dir,fname),"w") as out: out.write(fieldstr) if not seg.passed: break class OrgResultFormatter(ResultFormatter): """Create Org output. Suite result is just a concatenation for all results.""" def __init__(self,suite): """Initialize any state needed based on the suite""" def single_seg_test_result(self,test): """Add results to a single-segment test; segment results are not nested""" if len(test.segments) != 1: raise Exception(f"test {test.testnum} has {len(test.segments)}, can't format it") res = StringIO() okfail = "ok" if test.passed else "FAIL" seg = test.segments[0] writeline(res, f'* (TEST {test.testnum}) {test.title} : {okfail}') writeline(res, f'** COMMENTS') if test.description: writeline(res, f'{test.description}\n') writeline(res, f'{seg.description}\n') writeline(res, f'** PROGRAM: {seg.program}') if seg.test_directory: writeline(res, f'Test Directory: {seg.test_directory}') writeline(res, f'Change to this directory to see test data\n') writeline(res, f'To run this individual test in GDB use the command:') writeline(res, f' gdb --args {seg.program}') writeline(res, f'but any input to the program must be typed within the debugger') writeline(res, '') writeline(res, f'** FAILURE MESSAGES') for msg in seg.messages: writeline(res, f'- {msg}') if not seg.messages: writeline(res, f'None') writeline(res, f'') writeline(res, f'** SIDE-BY-SIDE DIFF of Expected vs Actual') if seg.skip_diff: writeline(res, f'NOTE: skip_diff is True so differences are ignored') writeline(res, f'{seg.sbs_diff_key}') writeline(res, f'') writeline(res, f'#+BEGIN_SRC sdiff') writeline(res, f'{seg.sbs_diff}') writeline(res, f'#+END_SRC') writeline(res, f'') writeline(res, f'** LINE-BY-LINE DIFF of Expected vs Actual') if not seg.lbl_diff: writeline(res, f'No differences found') else: writeline(res, f'#+BEGIN_SRC text') writeline(res, f'{seg.lbl_diff}',end="") writeline(res, f'#+END_SRC') writeline(res, f'') if not seg.use_valgrind: writeline(res, f'** VALGRIND Not in Use') else: fullprog = " ".join(seg.full_program) writeline(res, f'** VALGRIND REPORT') writeline(res, f'The program is run on under valgrind as') writeline(res, f' {fullprog}') writeline(res, f'which may be pasted onto a command line to run it.') writeline(res, f'') writeline(res, f'#+BEGIN_SRC text') writeline(res, f'{seg.output_valgrind}',end='') writeline(res, f'#+END_SRC') writeline(res, f'') writeline(res, f'** SUMMARY') if test.passed: writeline(res, f'Test Passed') else: writeline(res, f'Test FAILED for the following reasons') for msg in seg.messages: writeline(res, f'- {msg}') test.result = res.getvalue() res.close() def multiple_seg_test_result(self,test): """Add results to multiple-segment test; segment results are nested""" failseg = None res = StringIO() okfail = "ok" if test.passed else "FAIL" writeline(res, f'* (TEST {test.testnum}) {test.title} : {okfail}') writeline(res, f'** COMMENTS') writeline(res, f'{test.description}') writeline(res, f'') for (segi,seg) in enumerate(test.segments): okfail = "ok" if seg.passed else "FAIL" writeline(res, f'** Test Segment {segi+1} : {okfail}') if seg.description: writeline(res, seg.description) writeline(res, f'') writeline(res, f'*** PROGRAM: {seg.program}') if seg.program == "COMMENT": # commented segment writeline(res,f'This segment has no runnable program and is present') writeline(res,f'only as a comment. It will always pass.') elif seg.program.startswith("QUOTE"): # QUOTE segment to create a file quote_file = seg.program.split()[1] writeline(res,f'This segment create the file `{quote_file}`') writeline(res,f'It will always pass.') else: # normal segment that runs a program writeline(res, f'To run this individual test in GDB use the command:') writeline(res, f'gdb --args {seg.program}') writeline(res, f'but any input to the program must be typed within the debugger') writeline(res, f'') writeline(res, f'*** FAILURE MESSAGES') for msg in seg.messages: writeline(res, f'- {msg}') if not seg.messages: writeline(res, f'None') writeline(res, f'') writeline(res, f'*** SIDE-BY-SIDE DIFF of Expected vs Actual') if seg.skip_diff: writeline(res, f'NOTE: skip_diff is True so differences are ignored') writeline(res, f'{seg.sbs_diff_key}') writeline(res, f'') writeline(res, f'#+BEGIN_SRC sdiff') writeline(res, f'{seg.sbs_diff}') writeline(res, f'#+END_SRC') writeline(res, f'') # TODO: get line-by-line differences writeline(res, f'*** LINE-BY-LINE DIFF of Expected vs Actual') if not seg.lbl_diff: writeline(res, f'No differences found') else: writeline(res, f'#+BEGIN_SRC text') writeline(res, f'{seg.lbl_diff}',end="") writeline(res, f'#+END_SRC') writeline(res, f'') if not seg.use_valgrind: writeline(res,f'*** VALGRIND Not in Use') else: fullprog = " ".join(seg.full_program) writeline(res, f'*** VALGRIND REPORT') writeline(res, f'The program is run on under valgrind as') writeline(res, f'{fullprog}') writeline(res, f'which may be pasted onto a command line to run it.') writeline(res, f'') writeline(res, f'#+BEGIN_SRC text') writeline(res, f'{seg.output_valgrind}',end='') writeline(res, f'#+END_SRC') writeline(res, f'') if not seg.passed: # segments beyond first failure failseg = seg break # will not have been run # END loop over segments writeline(res, f'** SUMMARY') if test.passed: writeline(res, f'Test Passed') else: writeline(res, f'Test FAILED for the following reasons') for msg in failseg.messages: writeline(res, f'- {msg}') test.result = res.getvalue() res.close() def add_test_result(self,test): """Create Org formatted output for this test""" if len(test.segments)==1: self.single_seg_test_result(test) else: self.multiple_seg_test_result(test) def make_test_result_file(self,test): """Create an Org result file for this individual test if indicated by its fields""" # TODO: Add creation of raw files if requsted if not test.make_result_file: return subprocess.run(['mkdir','-p',test.results_dir],check=True) # ensure results diretory is present fname = f'{test.prefix}-{test.testnum:02d}-result.org' test.result_filename = os.path.join(test.results_dir, fname) with open(test.result_filename,'w') as f: f.write(test.result) def add_suite_result(self,suite): """Add overall result in Org format to suite""" with StringIO() as res: writeline(res, f'#+TITLE: {suite.title} Results') writeline(res, f'{suite.description}') writeline(res, f'') for test in suite.tests_torun: writeline(res, test.result) suite.result = res.getvalue() def make_suite_result_file(self,suite): """Create a Org result file for the entire suite""" if not suite.overall_result_file: return basename = os.path.splitext(suite.filename)[0] # remove extension suite.result_filename = f'{basename}-results.org' with open(suite.result_filename,'w') as f: f.write(suite.result) ################################################################################ def md_write_heading(stringio,level,heading): """Write an underlined heading in markdown format. The length of the underlining is dictated by level with 1 and 2 being underlined with = and - respectively followed by a linebreak and lower levels having an appropriate number of # characters prefixed.""" if level <= 0: raise Exception(f'Level {level} headlines not supported') elif level > 2: writeline(stringio, f'{"#"*level} {heading}') else: underline = "=" # level 1 underline if level > 1: underline = '-' # level 2 underline writeline(stringio, heading) writeline(stringio, underline*len(heading)) if level==1: writeline(stringio, '') class MarkdownResultFormatter(ResultFormatter): """Create Markdown output. Suite result is just a concatenation for all results. This version favors 'underlined' headlines to make them stand out more when shown in terminal output. """ def __init__(self,suite): """Initialize any state needed based on the suite""" def single_seg_test_result(self,test): """Add results to a single-segment test; segment results are not nested""" if len(test.segments) != 1: raise Exception(f"test {test.testnum} has {len(test.segments)}, can't format it") res = StringIO() okfail = "ok" if test.passed else "FAIL" seg = test.segments[0] md_write_heading(res,1,f'(TEST {test.testnum}) {test.title} : {okfail}') md_write_heading(res,2,f'COMMENTS') if test.description: writeline(res, f'{test.description}\n') writeline(res, f'{seg.description}\n') md_write_heading(res,2,f'PROGRAM: {seg.program}') if seg.test_directory: writeline(res, f'Test Directory: {seg.test_directory}') writeline(res, f'Change to this directory to see test data\n') writeline(res, f'To run this individual test in GDB use the command:') writeline(res, f' gdb --args {seg.program}') writeline(res, f'but any input to the program must be typed within the debugger') writeline(res, '') md_write_heading(res,2,f'FAILURE MESSAGES') for msg in seg.messages: writeline(res, f'- {msg}') if not seg.messages: writeline(res, f'None') writeline(res, f'') md_write_heading(res,2,f'SIDE-BY-SIDE DIFF of Expected vs Actual') if seg.skip_diff: writeline(res, f'NOTE: skip_diff is True so differences are ignored') writeline(res, f'{seg.sbs_diff_key}') writeline(res, f'') writeline(res, f'```sdiff') writeline(res, f'{seg.sbs_diff}') writeline(res, f'```') writeline(res, f'') md_write_heading(res,2,f'LINE-BY-LINE DIFF of Expected vs Actual') if not seg.lbl_diff: writeline(res, f'No differences found') else: writeline(res, f'```') writeline(res, f'{seg.lbl_diff}',end="") writeline(res, f'```') writeline(res, f'') if not seg.use_valgrind: md_write_heading(res,2,f'VALGRIND Not in Use') else: fullprog = " ".join(seg.full_program) md_write_heading(res,2,f'VALGRIND REPORT') writeline(res, f'The program is run on under valgrind as') writeline(res, f' {fullprog}') writeline(res, f'which may be pasted onto a command line to run it.') writeline(res, f'') writeline(res, f'```') writeline(res, f'{seg.output_valgrind}',end='') writeline(res, f'```') writeline(res, f'') md_write_heading(res,2,f'SUMMARY') if test.passed: writeline(res, f'Test Passed') else: writeline(res, f'Test FAILED for the following reasons') for msg in seg.messages: writeline(res, f'- {msg}') test.result = res.getvalue() res.close() def multiple_seg_test_result(self,test): """Add results to multiple-segment test; segment results are nested""" failseg = None res = StringIO() okfail = "ok" if test.passed else "FAIL" md_write_heading(res,1,f'(TEST {test.testnum}) {test.title} : {okfail}') md_write_heading(res,2,f'COMMENTS') writeline(res, f'{test.description}') writeline(res, f'') for (segi,seg) in enumerate(test.segments): okfail = "ok" if seg.passed else "FAIL" md_write_heading(res,2,f'Test Segment {segi+1} : {okfail}') if seg.description: writeline(res, seg.description) writeline(res, f'') md_write_heading(res,3,f'PROGRAM: {seg.program}') writeline(res, f'To run this individual test in GDB use the command:') writeline(res, f'gdb --args {seg.program}') writeline(res, f'but any input to the program must be typed within the debugger') writeline(res, f'') md_write_heading(res,3,f'FAILURE MESSAGES') for msg in seg.messages: writeline(res, f'- {msg}') if not seg.messages: writeline(res, f'None') writeline(res, f'') md_write_heading(res,3,f'SIDE-BY-SIDE DIFF of Expected vs Actual') if seg.skip_diff: writeline(res, f'NOTE: skip_diff is True so differences are ignored') writeline(res, f'{seg.sbs_diff_key}') writeline(res, f'') writeline(res, f'```sdiff') writeline(res, f'{seg.sbs_diff}') writeline(res, f'```') writeline(res, f'') md_write_heading(res,3,f'LINE-BY-LINE DIFF of Expected vs Actual') if not seg.lbl_diff: writeline(res, f'No differences found') else: writeline(res, f'```') writeline(res, f'{seg.lbl_diff}',end="") writeline(res, f'```') writeline(res, f'') if not seg.use_valgrind: md_write_heading(res,3,f'VALGRIND Not in Use') else: fullprog = " ".join(seg.full_program) md_write_heading(res,3,f'VALGRIND REPORT') writeline(res, f'The program is run on under valgrind as') writeline(res, f'{fullprog}') writeline(res, f'which may be pasted onto a command line to run it.') writeline(res, f'') writeline(res, f'```') writeline(res, f'{seg.output_valgrind}',end='') writeline(res, f'```') writeline(res, f'') if not seg.passed: # segments beyond first failure failseg = seg break # will not have been run # END loop over segments md_write_heading(res,3,f'SUMMARY') if test.passed: writeline(res, f'Test Passed') else: writeline(res, f'Test FAILED for the following reasons') for msg in failseg.messages: writeline(res, f'- {msg}') test.result = res.getvalue() res.close() def add_test_result(self,test): """Create Org formatted output for this test""" if len(test.segments)==1: self.single_seg_test_result(test) else: self.multiple_seg_test_result(test) def make_test_result_file(self,test): """Create an Org result file for this individual test if indicated by its fields""" # TODO: Add creation of raw files if requsted if not test.make_result_file: return subprocess.run(['mkdir','-p',test.results_dir],check=True) # ensure results diretory is present fname = f'{test.prefix}-{test.testnum:02d}-result.md' test.result_filename = os.path.join(test.results_dir, fname) with open(test.result_filename,'w') as f: f.write(test.result) def add_suite_result(self,suite): """Add overall result in Org format to suite""" with StringIO() as res: md_write_heading(res,1,f'{suite.title} Results') writeline(res, f'{suite.description}') writeline(res, f'') for test in suite.tests_torun: writeline(res, test.result) suite.result = res.getvalue() def make_suite_result_file(self,suite): """Create a Org result file for the entire suite""" if not suite.overall_result_file: return basename = os.path.splitext(suite.filename)[0] # remove extension suite.result_filename = f'{basename}-results.md' with open(suite.result_filename,'w') as f: f.write(suite.result) ################################################################################ ### Junk that can be ignored ################################################################################ ### Main() setup PARSER_TYPES = { "org" : OrgSuiteParser() } """mapping of file types to parsers""" OUTPUT_TYPES = { "org" : (lambda suite : OrgResultFormatter(suite)), "md" : (lambda suite : MarkdownResultFormatter(suite)), } """mapping of file types to parsers""" PROGRESS_TYPES = { "line" : lambda suite : LineProgressReporter(suite), "short" : lambda suite : ShortProgressReporter(suite), } """options for progres reporting""" SHORT_DESCRIPTION = """\ Run tests for a shell program specified in an org-like file and report the results. """ LONG_DESCRIPTION = """\ ---------------------------------------- --- RUNNING TESTS --- ---------------------------------------- Running a test is done from the command line and will default to running all tests in a provided test file. Output shows each test with a pass/fail and failures have results files indicating what went wrong. """ ################################################################################ ### Main Entry point def main(): """main entry point for command line runs of """ # set up arg parsing, don't line-wrap descriptions argparser = argparse.ArgumentParser(description=SHORT_DESCRIPTION, epilog=LONG_DESCRIPTION, formatter_class=argparse.RawDescriptionHelpFormatter) argparser.add_argument("-d","--debug",default=False,action="store_const",const=True, help="Print debugging messages") argparser.add_argument("-t","--input_type",default="org",action="store", choices=list(PARSER_TYPES.keys()), help="Specify the test input file type") argparser.add_argument("-o","--output_type",default="md",action="store", choices=list(OUTPUT_TYPES.keys()), help="Specify the test results output file type") argparser.add_argument("-p","--progress",action="store",default="line", choices=list(PROGRESS_TYPES.keys()), help="Select progress style reporting") argparser.add_argument("-s","--short",dest="progress",action="store_const",const="short", help="Report progress in short form: . for pass, F for fail") argparser.add_argument("-r","--regen",action="store",default=None, metavar="", help="Regenerate the given test file using actual results, store in given file ") argparser.add_argument("-S","--save-rawfiles",default=False,action="store_const",const=True, help="Save raw input/output files for tests in test-results/raw directory") argparser.add_argument("testfile",metavar="testfile", help="File containing tests (default org format)") argparser.add_argument("testnums",metavar="testnum",nargs='*',type=int, help="Optional test numbers to run (default all)") args = argparser.parse_args() if args.debug or "TESTYDEBUG" in os.environ: log.basicConfig(format=LOGFORMAT,level=log.DEBUG,encoding='utf-8') log.debug("Debugging messages enabled") log.debug(f"'{args.input_type}' input_type selected") if args.input_type not in PARSER_TYPES: print(f"ERROR: test suite file type {args.input_type} is not supported") sys.exit(1) suite_parser = PARSER_TYPES[args.input_type] if args.save_rawfiles: # if more command line options are added, os.environ['save_rawfiles'] = "1" # move these to a function, likely that would try: log.debug(f"Parsing file {args.testfile}") suite = suite_parser.parse_file(args.testfile) if args.regen: suite.setup_regen() testnums_torun = args.testnums or list(range(1,len(suite.tests)+1)) log.debug(f"Selected following tests: {testnums_torun}") for i in testnums_torun: if i <= 0 or i > len(suite.tests): msg = f"'{args.testfile}' has {len(suite.tests)} tests, {i} is out of bounds" raise ParseError(msg) suite.tests_torun = [suite.tests[i] for i in testnums_torun] progress_reporter = PROGRESS_TYPES[args.progress](suite) result_formatter = OUTPUT_TYPES[args.output_type](suite) log.debug(f"Running suite") suite.run(progress_reporter, result_formatter) if args.regen: print(f'Regenerating test suite in file {args.regen}') suite_parser.regen_file(args.regen,suite) except ParseError as e: print(str(e)) sys.exit(1) except KeyboardInterrupt: print() print("testy received a keyboard interrupt; Exiting") sys.exit(1) if suite.show=="all": # show everything, accept "1" for backwards compat print() print("---- Showing All Test Results -----") print(suite.result) elif suite.show=="fail": # show only failures failed_tests = [t for t in suite.tests_torun if t.passed == False] if failed_tests: print() print("---- Showing Failed Test Results -----") for test in failed_tests: print(test.result) print() elif suite.show=="singlefail" and len(suite.tests_torun)==1: test = suite.tests_torun[0] # on single tests, show failures (default) if not test.passed: print() print("---- Single Test Failed -----") print(test.result) else: pass # show nothing if __name__ == '__main__': main()