import logging import os import random import shutil import tempfile from datetime import datetime import time from mjtest.util.shell import execute from mjtest.util.utils import get_mjtest_basedir, force_colored_output import mjtest.util.utils from typing import Tuple, List from preproc.preproc.preprocessor import PreProcessor, PreProcessorError, is_importable_file _LOG = logging.getLogger("env") class TestMode: lexer = "lexer" syntax = "syntax" ast = "ast" semantic = "semantic" comile_firm = "compile-firm" exec = "exec" USE_TESTS_OF_OTHER = { ast: [syntax] } """ All 'success' tests of the n.th mode can used as 'success' tests for the n-1.th mode""" TEST_MODES = [TestMode.lexer, TestMode.syntax, TestMode.ast, TestMode.semantic, TestMode.comile_firm, TestMode.exec] class Environment: LOG_LEVELS = { "info": logging.INFO, "error": logging.ERROR, "warn": logging.WARN, "debug": logging.DEBUG } def __init__(self, mode, mj_run: str, tmp_dir: str = "", test_dir: str = "", only_incorrect_tests: bool = False, parallel: bool = False, timeout: int = 30, report_dir: str = "", log_level: str = "warn", produce_no_reports: bool = True, output_no_incorrect_reports: bool = False, produce_all_reports: bool = False, report_subdir: str = None, ci_testing: bool = False, color: bool = False): if color: force_colored_output() self.mode = mode self.mj_run_cmd = os.path.realpath(mj_run) if tmp_dir: self.own_tmp_dir = True self.tmp_dir = os.path.abspath(os.path.expandvars(tmp_dir)) if not os.path.exists(tmp_dir): os.mkdir(self.tmp_dir) else: self.own_tmp_dir = False self.tmp_dir = tempfile.mkdtemp("mjtest") if test_dir: self.test_dir = os.path.abspath(os.path.realpath(test_dir)) else: self.test_dir = os.path.join(get_mjtest_basedir(), "tests") if not os.path.exists(self.test_dir): os.mkdir(self.test_dir) for d in TEST_MODES: os.mkdir(os.path.join(self.test_dir, d)) self.only_incorrect_tests = only_incorrect_tests self.parallel = parallel self.timeout = timeout if not produce_no_reports: if report_dir: self.report_dir = os.path.abspath(os.path.expandvars(report_dir)) else: self.report_dir = os.path.join(get_mjtest_basedir(), "reports") try: os.mkdir(self.report_dir) except IOError: pass self.report_dir = os.path.join(self.report_dir, report_subdir or datetime.now().strftime("%d-%m-%y_%H-%M-%S")) else: self.report_dir = None logging.basicConfig(level=self.LOG_LEVELS[log_level]) self.produce_reports = not produce_no_reports # type: bool self.output_incorrect_reports = not output_no_incorrect_reports self.produce_all_reports = produce_all_reports self.ci_testing = ci_testing self._tmp_file_ctr = 0 self._already_preprocessed_files = set() def create_tmpfile(self) -> str: self._tmp_file_ctr += 1 return os.path.join(self.tmp_dir, str(round(time.time() * 100000)) + str(random.randrange(0, 10000, 1)) + str(self._tmp_file_ctr)) def create_tmpdir(self) -> str: dir = self.create_tmpfile() os.mkdir(dir) return dir def clean_up(self): if not self.own_tmp_dir: shutil.rmtree(self.tmp_dir) def run_mj_command(self, mode: str, *args: Tuple[str]) -> Tuple[bytes, bytes, int]: """ Execute the MiniJava `run` script with the given arguments. :param args: arguments for the MiniJava `run` script :return: (out, err, return code) """ mode_flag = { TestMode.lexer: "--lextest", TestMode.syntax: "--parsetest", TestMode.ast: "--print-ast", TestMode.semantic: "--check", TestMode.comile_firm: "--compile-firm" }[mode] cmd = [self.mj_run_cmd, mode_flag] + list(args) return execute(cmd, timeout=self.timeout) def run_command(self, cmd: str, *args: Tuple[str]) -> Tuple[bytes, bytes, int]: """ Execute the passend command with its arguments :return: (out, err, return code) """ return execute([cmd] + list(args), timeout=self.timeout) def has_to_preprocess(self, file: str) -> bool: return os.path.relpath(file, self.test_dir).startswith("exec") def is_lib_file(self, file: str) -> bool: return is_importable_file(file) def preprocess(self, file: str) -> str: """ Pre process the passed file if needed and return the resulting file """ if not self.has_to_preprocess(file): return file if ".preprocessed" in os.path.relpath(file, self.test_dir): return file import_base_dir = os.path.join(self.test_dir, os.path.relpath(file, self.test_dir).lstrip(os.sep).split(os.sep)[0]) dst_dir = os.path.join(import_base_dir, ".preprocessed", os.path.split(os.path.relpath(file, import_base_dir))[0]) dst_file = os.path.join(dst_dir, os.path.basename(file)) if dst_file.endswith(".mj"): dst_file = dst_file.replace(".mj", ".java") if dst_file in self._already_preprocessed_files: return dst_file self._already_preprocessed_files.add(dst_file) if os.path.exists(dst_file) and os.path.isfile(dst_file) and os.path.getmtime(file) < os.path.getmtime(dst_file) and False: _LOG.debug("File '{}' already exists in a pre processed form".format(os.path.relpath(file))) return dst_file cur = os.path.split(dst_file)[0] while not os.path.exists(cur): os.mkdir(cur) cur = os.path.split(cur)[0] if not os.path.exists(dst_dir): os.mkdir(dst_dir) try: PreProcessor(file, import_base_dir, dst_file).preprocess() except: _LOG.exception("Pre processing file '{}'".format(file)) raise return dst_file