from collections import namedtuple import shutil from typing import Optional, List, Tuple, T, Union, Dict import collections from mjtest.environment import Environment, TestMode, TEST_MODES from os.path import join, exists, basename import logging import os import multiprocessing from mjtest.util.parallelism import available_cpu_count from mjtest.util.utils import cprint, colored from pprint import pprint import shutil import difflib _LOG = logging.getLogger("tests") RunResult = namedtuple("RunResult", ['count', 'failed']) class TestSuite: """ The whole set of tests. """ def __init__(self, env: Environment): self.env = env self.test_cases = {} # type: Dict[str, List[TestCase]] self.correct_test_cases = collections.defaultdict(set) # type: Dict[str, Set[str]] self._load_test_cases() def _load_test_cases(self): types = [self.env.mode]#TEST_MODES#TEST_MODES[TEST_MODES.index(self.env.mode):] if self.env.ci_testing: types = TEST_MODES elif self.env.mode in TestMode.USE_TESTS_OF_OTHER: types += TestMode.USE_TESTS_OF_OTHER[self.env.mode] for type in types: self._load_test_case_type(type) def _load_test_case_type(self, type: str): dir = join(self.env.test_dir, type) if exists(dir): self._load_test_case_dir(type, dir) else: _LOG.info("Test folder {} doesn't exist".format(dir)) def _load_test_case_dir(self, mode: str, dir: str): correct_test_cases = set() log_file = self._log_file_for_type(mode) if exists(log_file): with open(log_file) as f: correct_test_cases = set() for t in f.readlines(): t = t.strip() if len(t) > 0: self.correct_test_cases[mode].add(t) correct_test_cases.add(t) m = mode if m != self.env.mode and self.env.mode in TestMode.USE_TESTS_OF_OTHER and \ m in TestMode.USE_TESTS_OF_OTHER[self.env.mode]: m = self.env.mode file_names = [] for root, dirs, files in os.walk(dir): base = os.path.relpath(root, dir) if dir == root: file_names.extend(files) elif base == ".preprocessed": continue else: file_names.extend(join(base, file) for file in files) for file in sorted(file_names): if not TestCase.has_valid_file_ending(self.env.mode, file): _LOG.debug("Skip file " + file) elif self.env.only_incorrect_tests and file in correct_test_cases: _LOG.info("Skip file {} as its test case was executed correctly the last run") else: file_path = join(dir, file) if self.env.has_to_preprocess(file_path) and self.env.is_lib_file(file_path): _LOG.debug("Skip lib file '{}'".format(file)) continue preprocessed = self.env.preprocess(join(dir, file)) test_case = TestCase.create_from_file(self.env, m, join(dir, file), preprocessed) if not test_case: pass elif not test_case.can_run(): _LOG.debug("Skip test case '{}' because it isn't suited".format(test_case.name())) else: if m not in self.test_cases: self.test_cases[m] = [] self.test_cases[m].append(test_case) if m in self.test_cases and len(self.test_cases[m]) == 0: del self.test_cases[m] def _log_file_for_type(self, type: str): return join(self.env.test_dir, type, ".mjtest_correct_testcases_" + self.env.mode) def _add_correct_test_case(self, test_case: 'TestCase'): self.correct_test_cases[test_case.type].add(basename(test_case.file)) def run(self) -> RunResult: ret = RunResult(0, 0) try: for mode in self.test_cases.keys(): single_ret = RunResult(0, 0) if self.env.parallel: single_ret = self._run_parallel(mode, available_cpu_count()) else: single_ret = self._run_sequential(mode) ret = RunResult(ret.count + single_ret.count, ret.failed + single_ret.failed) except BaseException: logging.exception("") finally: print("-" * 40) if ret.failed > 0: # some tests failed print(colored("Ran {} tests, of which ".format(ret.count), "red") + colored("{} failed.".format(ret.failed), "red", attrs=["bold"])) else: cprint("All {} run tests succeeded".format(ret.count), "green") if self.env.produce_reports and (self.env.produce_all_reports or ret.failed > 0): report_dir = self.env.report_dir + "." + ("successful" if ret.failed == 0 else "failed") try: os.rename(self.env.report_dir, report_dir) except IOError: pass print("A full report for each test can be found at {}".format( os.path.relpath(report_dir))) return ret def _run_sequential(self, mode: str) -> RunResult: failed = 0 count = 0 for test_case in self.test_cases[mode]: try: ret = self._run_test_case(test_case) if ret is False or not ret.is_correct(): failed += 1 else: self._add_correct_test_case(test_case) count += 1 except KeyboardInterrupt: return RunResult(count, failed) return RunResult(count, failed) def _func(self, test_case: 'TestCase'): ret = self._run_test_case(test_case) if ret is not False and ret.is_correct(): return 0, test_case return 1, test_case def _run_parallel(self, mode: str, parallel_jobs: int) -> RunResult: pool = multiprocessing.Pool(parallel_jobs) rets = pool.map(self._func, self.test_cases[mode]) result = RunResult(len(rets), sum(map(lambda x: x[0], rets))) for (suc, test_case) in rets: if suc == 0: self._add_correct_test_case(test_case) return result def _run_test_case(self, test_case: 'TestCase') -> Optional['TestResult']: try: ret = test_case.run() color = "green" if ret.is_correct() else "red" print(colored("[{result:7s}] {tc:40s}".format( result="SUCCESS" if ret.is_correct() else "FAIL", tc=test_case.name()), color, attrs=["bold"]) + colored("" if ret.is_correct() else ret.short_message(), color)) try: if self.env.produce_reports and (self.env.produce_all_reports or not ret.is_correct()): if not exists(self.env.report_dir): os.mkdir(self.env.report_dir) rep_dir = join(self.env.report_dir, test_case.type) if not exists(rep_dir): try: os.mkdir(rep_dir) except IOError: pass suffix = ".correct" if ret.is_correct() else ".incorrect" ret.store_at(join(rep_dir, test_case.short_name() + suffix)) if self.env.output_incorrect_reports and not ret.is_correct(): print(colored("Report for failing test case {}".format(test_case.short_name()), "red", attrs=["bold"])) print(colored(ret.long_message(), "red")) return ret except IOError: _LOG.exception("Caught i/o error while trying to store the report for '{}'" .format(test_case.name())) return False except KeyboardInterrupt: raise except BaseException: _LOG.exception("At test case '{}'".format(test_case.short_name())) return False def store(self): for mode in self.correct_test_cases.keys(): log_file = self._log_file_for_type(mode) try: try: os.mkdir(os.path.dirname(log_file)) except IOError: pass with open(log_file, "w") as f: f.write("\n".join(self.correct_test_cases[mode])) except IOError as e: _LOG.exception("Caught i/o error while storing {}".format(log_file)) class TestCase: """ A single test case. """ TEST_CASE_CLASSES = dict((k, []) for k in TEST_MODES) FILE_ENDINGS = [] INVALID_FILE_ENDINGS = [] def __init__(self, env: Environment, type: str, file: str, preprocessed_file: str): self.env = env self.type = type self.file = file self.preprocessed_file = preprocessed_file def should_succeed(self) -> bool: raise NotImplementedError() def can_run(self, mode: str = "") -> bool: mode = mode or self.env.mode same_mode = self.type == mode types = TEST_MODES[TEST_MODES.index(self.env.mode):] if self.env.ci_testing: return same_mode or \ (self.type in types and self.should_succeed()) or \ (self.type not in types and not self.should_succeed()) else: return same_mode def run(self) -> 'TestResult': raise NotImplementedError() @classmethod def create_from_file(cls, env: Environment, mode: str, file: str, preprocessed_file: str) -> Optional['TestCase']: if cls.has_valid_file_ending(env.mode, file): return cls._test_case_class_for_file(env.mode, file)(env, mode, file, preprocessed_file) return None def name(self): return "{}:{}".format(self.type, self.short_name()) def short_name(self) -> str: raise NotImplementedError() @classmethod def _test_case_class_for_file(cls, type: str, file: str): for t in cls.TEST_CASE_CLASSES[type]: if any(file.endswith(e) for e in t.FILE_ENDINGS) and \ not any(file.endswith(e) for e in t.INVALID_FILE_ENDINGS): return t return False @classmethod def has_valid_file_ending(cls, type: str, file: str): return cls._test_case_class_for_file(type, file) != False class TestResult: def __init__(self, test_case: TestCase, error_code: int): self.test_case = test_case self.error_code = error_code def is_correct(self) -> bool: return self.succeeded() == self.test_case.should_succeed() def succeeded(self) -> bool: return self.error_code == 0 def store_at(self, file: str): with open(file, "w") as f: print(self.long_message(), file=f) def short_message(self) -> str: raise NotImplementedError() def long_message(self) -> str: raise NotImplementedError() class BasicTestResult(TestResult): def __init__(self, test_case: TestCase, error_code: int, output: str = None, error_output: str = None, incorrect_msg: str = "incorrect return code"): super().__init__(test_case, error_code) self._incorrect_msg = incorrect_msg self._contains_error_str = error_output is not None and "error" in error_output self.error_output = error_output self.output = output self.other_texts = [] # type: List[Tuple[str, str, bool]] if output: self.add_additional_text("Output", output) if error_output: self.add_additional_text("Error output", error_output) def is_correct(self): if self.succeeded(): return super().is_correct() else: return super().is_correct() and self._contains_error_str def short_message(self) -> str: if self.is_correct(): return "correct" else: if not self.succeeded() and not self.test_case.should_succeed() and not self._contains_error_str: return "the error output doesn't contain the word \"error\"" return self._incorrect_msg def long_message(self) -> str: file_content = [] with open(self.test_case.preprocessed_file, "r") as f: file_content = [line.rstrip() for line in f] others = [] for title, content, long_text in self.other_texts: if long_text: others.append(""" {}: {} """.format(title, self._ident(content))) else: others.append("""{}: {}\n""".format(title, content)) return """{} Source file: {} Return code: {} {} """.format(self.short_message().capitalize(), self._ident(file_content), self.error_code, "\n".join(others)) def add_additional_text(self, title: str, content: str): self.other_texts.append((title, content, True)) def add_additional_text_line(self, title: str, content: str): self.other_texts.append((title, content, False)) def _ident(self, text: Union[str,List[str]]) -> str: arr = text if isinstance(text, list) else text.split("\n") if len(arr) == 0 or text == "": return "" arr = ["[{:04d}] {:s}".format(i + 1, l) for (i, l) in enumerate(arr)] return "\n".join(arr) class BasicDiffTestResult(BasicTestResult): def __init__(self, test_case: TestCase, error_code: int, output: str, error_output: str, expected_output: str, short_error_message: str = None): super().__init__(test_case, error_code, output, error_output) self.expected_output = expected_output self._is_output_correct = self.expected_output.strip() == self.output.strip() if self.is_correct(): self.add_additional_text("Expected and actual output", self.output) elif self.succeeded() and self.test_case.should_succeed(): self.add_additional_text("Diff[expected output, actual output]", self._output_diff()) self.add_additional_text("Expected output", self.expected_output) self.short_error_message = short_error_message #self.add_additional_text("Actual output", self.output) def is_correct(self): if self.succeeded(): return super().is_correct() and self.is_output_correct() else: return super().is_correct() and self._contains_error_str def _output_diff(self) -> str: return "".join(difflib.Differ().compare(self.expected_output.splitlines(True), self.output.splitlines(True))) def is_output_correct(self) -> str: return self._is_output_correct def short_message(self) -> str: if self.is_correct(): return "correct" else: if self.short_error_message: return self.short_error_message if not self.succeeded() and not self.test_case.should_succeed() and not self._contains_error_str: return "the error output doesn't contain the word \"error\"" if self.succeeded() and self.test_case.should_succeed(): return "the actual output differs from the expected" return "incorrect return code" class DiffTest(TestCase): FILE_ENDINGS = [".invalid.mj", ".valid.mj", ".mj"] OUTPUT_FILE_ENDING = ".out" MODE = TestMode.ast def __init__(self, env: Environment, type: str, file: str, preprocessed_file: str): super().__init__(env, type, file, preprocessed_file) self._should_succeed = not file.endswith(".invalid.mj") self._expected_output_file = file + self.OUTPUT_FILE_ENDING self._has_expected_output_file = exists(self._expected_output_file) def should_succeed(self) -> bool: return self._should_succeed def short_name(self) -> str: return basename(self.file) def run(self) -> BasicDiffTestResult: out, err, rtcode = self.env.run_mj_command(self.MODE, self.file) exp_out = "" if rtcode == 0 and self.should_succeed(): if self._has_expected_output_file and self.type == self.MODE and self.env.mode == self.MODE: with open(self._expected_output_file, "r") as f: exp_out = f.read() #else: # _LOG.error("Expected output file for test case {}:{} is missing.".format(self.MODE, self.short_name())) if self.type == self.MODE and self.env.mode == self.MODE: return BasicDiffTestResult(self, rtcode, out.decode(), err.decode(), exp_out) return BasicTestResult(self, rtcode, out.decode(), err.decode()) class LexerDiffTest(DiffTest): MODE = TestMode.lexer TestCase.TEST_CASE_CLASSES[TestMode.lexer].append(LexerDiffTest) import mjtest.test.syntax_tests import mjtest.test.ast_tests import mjtest.test.semantic_tests import mjtest.test.exec_tests