Commit 9254fdd4 authored by Johannes Bechberger's avatar Johannes Bechberger

Initil commit

parents
Copyright (c) 2016 mj3-16 Team (and contributers)
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
recursive-include mjtest *.py
README.mdwn
LICENCE
MJTest
======
A test runner (and suite) for the MiniJava compiler (and its parts) written in the compiler lab of the KIT.
It's heavily inspired by Sisyphus (and uses some of its code)
__Please contribute to the test cases__
Test modes
----------
The test cases are divided in 3 'modes':
- __syntax__: Test cases that just check whether `./run --parsecheck` accepts as correct or rejects
them.
- __semantic__: Test cases that check semantic checking of MiniJava programs
- __exec__: Test cases that check the correct compilation of MiniJava programs.
_Only the syntax mode is currently usable, but the other three will follow._
The test different test cases for each mode are located in a folder with the same name.
The default directory that contains all test folders is `tests`.
The different types a test cases are differentiated by their file endings.
Test types for the syntax mode
------------------------------
<table>
<tr><th>File ending(s) of test cases</th><th>Expected behaviour to complete a test of this type</th></tr>
<tr>
<td><code>.valid.mj</code><code>.mj</code>
<td>Return code is <code>0</code>, i.e. the MiniJava is accepted as syntactically correct</td>
</tr>
<tr>
<td><code>.invalid.mj</code>
<td>Return code is <code>&gt; 0</code> and the error output contains the word <code>error</code></td>
</tr>
</table>
Test runner
-----------
### Requirements
The following programs are required (and executable by simply calling their names).
- `python3` (at least Python3.3)
- `javac` and `java`
### Installation
Just clone `mjtest` and install it via `pip3`.
```sh
git clone https://github.com/mj3-16/mjtest
cd mjtest
sudo pip3 install .
```
### Usage
Output of the `mjtest --help`
```
usage: mjtest [-h] [--tmp_dir] [--test_dir] [--only_incorrect_tests]
[--parallel] [--timeout] [--report_dir] [--log LOG]
{syntax,semantic,exec} MJ_RUN_CMD
MiniJava test runner
positional arguments:
{syntax,semantic,exec}
What do you want to test?
MJ_RUN_CMD Command to run your MiniJava implementation, e.g.
`mj/run`
optional arguments:
-h, --help show this help message and exit
--tmp_dir Used temporary directory
--test_dir Directory that contains all test cases, default is the
'tests' directory
--only_incorrect_tests
Only run the tests that were incorrect the last run
--parallel Run the tests in parallel
--timeout Abort a program after TIMEOUT seconds
--report_dir Directory to store the reports in, default is
'reports'
--log LOG Logging level (error, warn, info or debug)
```
### Example usage
Assuming you want to run the syntax tests and your MiniJava base folder is `~/code/mj` then run
```
mjtest syntax `~/code/mj/run --lextest`
```
This will…
- … create reports in a folder named after the current date and time inside the `reports` folder
- … output something like
```
```
- … log that some test cases were executed correctly
- … return with an error code of `0` if all tests executed correct
Contributions
-------------
__Please contribute to this test runner and the accompanied test cases.__
To add test cases just open a pull request. The test cases must have unique names (in each mode folder).
Licence
-------
MIT, see LICENCE file for more information.
VERSION = "0.42"
\ No newline at end of file
import logging
from pprint import pprint
import sys
import mjtest.util.utils
import argparse
from mjtest.environment import TestMode, Environment, TEST_MODES
from mjtest.test.tests import TestSuite
# adapted from http://stackoverflow.com/a/8527629
class LogLevelChoices(argparse.Action):
CHOICES = ["error", "warn", "info", "debug"]
def __call__(self, parser, namespace, values, option_string=None):
if values:
for value in values:
if value not in self.CHOICES:
message = ("invalid choice: {0!r} (choose from {1})"
.format(value,
', '.join([repr(action)
for action in self.CHOICES])))
raise argparse.ArgumentError(self, message)
setattr(namespace, self.dest, values)
if True:#__name__ == '__main__':
parser = argparse.ArgumentParser(description="MiniJava test runner", add_help=True)
parser.add_argument("mode",
choices=TEST_MODES,
help="What do you want to test?")
parser.add_argument("mj_run_cmd",
metavar="MJ_RUN_CMD",
help="Command to run your MiniJava implementation, e.g. `mj/run`")
parser.add_argument("--tmp_dir", action="store_const", default="", const="tmp_dir",
help="Used temporary directory")
parser.add_argument("--test_dir", action="store_const", default="", const="test_dir",
help="Directory that contains all test cases, default is the 'tests' directory")
parser.add_argument("--only_incorrect_tests", action="store_true", default=False,
help="Only run the tests that were incorrect the last run")
parser.add_argument("--parallel", action="store_true", default=False,
help="Run the tests in parallel")
parser.add_argument("--timeout", action="store_const", default=30, const="timeout",
help="Abort a program after TIMEOUT seconds")
parser.add_argument("--report_dir", action="store_const", default="", const="report_dir",
help="Directory to store the reports in, default is 'reports'")
parser.add_argument("--log_level", action=LogLevelChoices, default="warn", const="log_level",
help="Logging level (error, warn, info or debug)")
args = parser.parse_args()
suite = TestSuite(Environment(**vars(args)))
ret = None
try:
ret = suite.run()
finally:
suite.env.clean_up()
suite.store()
if ret is None or ret:
sys.exit(1)
else:
sys.exit(0)
import logging
import os
import shutil
import tempfile
from datetime import datetime, time
from mjtest.util.shell import execute
from mjtest.util.utils import get_mjtest_basedir
import humanfriendly as hf
from typing import Tuple, List
class TestMode:
syntax = "syntax"
semantic = "semantic"
exec = "exec"
""" All 'success' tests of the n.th mode can used as 'success' tests for the n-1.th mode"""
TEST_MODES = [TestMode.syntax, TestMode.semantic, TestMode.exec]
class Environment:
LOG_LEVELS = {
"info": logging.INFO,
"error": logging.ERROR,
"warn": logging.WARN,
"debug": logging.DEBUG
}
def __init__(self, mode, mj_run_cmd: str, tmp_dir: str = "", test_dir: str = "",
only_incorrect_tests: bool = False, parallel: bool = False,
timeout: int = 30, report_dir: str = "", log_level: str = "warn"):
self.mode = mode
self.mj_run_cmd = os.path.realpath(mj_run_cmd)
if tmp_dir:
self.own_tmp_dir = True
self.tmp_dir = os.path.abspath(os.path.expandvars(tmp_dir))
if not os.path.exists(tmp_dir):
os.mkdir(self.tmp_dir)
else:
self.own_tmp_dir = False
self.tmp_dir = tempfile.mkdtemp("mjtest")
if test_dir:
self.test_dir = os.path.abspath(os.path.realpath(test_dir))
else:
self.test_dir = os.path.join(get_mjtest_basedir(), "tests")
if not os.path.exists(self.test_dir):
os.mkdir(self.test_dir)
for d in [TestMode.syntax, TestMode.semantic, TestMode.exec]:
os.mkdir(os.path.join(self.test_dir, d))
self.only_incorrect_tests = only_incorrect_tests
self.parallel = parallel
self.timeout = timeout
if tmp_dir:
self.report_dir = os.path.abspath(os.path.expandvars(report_dir))
if not os.path.exists(report_dir):
os.mkdir(self.report_dir)
else:
self.report_dir = os.path.join(get_mjtest_basedir(), "reports")
if not os.path.exists(self.report_dir):
os.mkdir(self.report_dir)
self.report_dir = os.path.join(self.report_dir, datetime.now().strftime("%d.%m.%y:%X"))
os.mkdir(self.report_dir)
logging.basicConfig(level=self.LOG_LEVELS[log_level])
def create_tmpfile(self) -> str:
return os.path.join(self.tmp_dir, os.times())
def clean_up(self):
if not self.own_tmp_dir:
shutil.rmtree(self.tmp_dir)
def run_mj_command(self, *args: Tuple[str]) -> Tuple[bytes, bytes, int]:
"""
Execute the MiniJava `run` script with the given arguments.
:param args: arguments for the MiniJava `run` script
:return: (out, err, return code)
"""
cmd = [self.mj_run_cmd] + list(args)
return execute(cmd, timeout=self.timeout)
\ No newline at end of file
__author__ = 'parttimenerd'
from mjtest.environment import Environment, TestMode
from mjtest.test.tests import TestCase, TestResult, BasicTestResult
from os import path
class BasicSyntaxTest(TestCase):
FILE_ENDINGS = [".invalid.mj", ".valid.mj", ".mj"]
def __init__(self, env: Environment, type: str, file: str):
super().__init__(env, type, file)
self._should_succeed = not file.endswith(".invalid.mj")
def should_succeed(self) -> bool:
return self._should_succeed
def short_name(self) -> str:
return path.basename(self.file)[:-3]
def run(self) -> TestResult:
out, err, rtcode = self.env.run_mj_command("--parsetest ", self.file)
return BasicTestResult(self, rtcode, out.decode(), err.decode())
TestCase.TEST_CASE_CLASSES[TestMode.syntax].append(BasicSyntaxTest)
from collections import namedtuple
import shutil
from typing import Optional, List, Tuple, T, Union
from mjtest.environment import Environment, TestMode, TEST_MODES
from os.path import join, exists, basename
import logging
import os
import multiprocessing
from mjtest.util.parallelism import available_cpu_count
from termcolor import cprint, colored
from pprint import pprint
import shutil
_LOG = logging.getLogger("tests")
RunResult = namedtuple("RunResult", ['count', 'failed'])
class TestSuite:
"""
The whole set of tests.
"""
def __init__(self, env: Environment):
self.env = env
self.test_cases = {} # type: Dict[str, List[TestCase]]
self.correct_test_cases = {} # type: Dict[str, List[str]]
self._load_test_cases()
def _load_test_cases(self):
types = TEST_MODES[TEST_MODES.index(self.env.mode):]
for type in types:
self._load_test_case_type(type)
def _load_test_case_type(self, type: str):
dir = join(self.env.test_dir, type)
if exists(dir):
self._load_test_case_dir(type, dir)
else:
_LOG.warn("Test folder {} doesn't exist".format(dir))
def _load_test_case_dir(self, mode: str, dir: str):
self.test_cases[mode] = []
correct_test_cases = set()
log_file = self._log_file_for_type(mode)
if exists(log_file):
with open(log_file) as f:
correct_test_cases = set(f.readlines())
for file in sorted(os.listdir(dir)):
if not TestCase.has_valid_file_ending(mode, file):
_LOG.debug("Skip file " + file)
elif self.env.only_incorrect_tests and file in correct_test_cases:
_LOG.info("Skip file {} as its test case was executed correctly the last run")
else:
test_case = TestCase.create_from_file(self.env, self.env.mode, join(dir, file))
if not test_case.can_run():
_LOG.debug("Skip test case '{}' because it isn't suited".format(test_case.name()))
else:
self.test_cases[mode].append(test_case)
if len(self.test_cases[mode]) == 0:
del self.test_cases[mode]
def _log_file_for_type(self, type: str):
return join(self.env.test_dir, type, ".mjtest_correct_testcases")
def _add_correct_test_case(self, test_case: 'TestCase'):
self.correct_test_cases[test_case.type].append(basename(test_case.file))
def run(self) -> RunResult:
ret = RunResult(0, 0)
try:
for mode in self.test_cases.keys():
single_ret = RunResult(0, 0)
if self.env.parallel:
single_ret = self._run_parallel(mode, available_cpu_count())
else:
single_ret = self._run_sequential(mode)
ret = RunResult(ret.count + single_ret.count, ret.failed + single_ret.failed)
except BaseException:
logging.exception("")
finally:
print("-" * 40)
if ret.failed > 0: # some tests failed
print(colored("Ran {} tests, of which ".format(ret.count), "red") +
colored("{} failed.".format(ret.failed), "red", attrs=["bold"]))
else:
cprint("All {} run tests succeeded".format(ret.count), "green")
report_dir = self.env.report_dir + "." + ("successful" if ret.failed == 0 else "failed")
os.rename(self.env.report_dir, report_dir)
print("A full report for each test can be found at {}".format(
os.path.relpath(report_dir)))
return ret
def _run_sequential(self, mode: str) -> RunResult:
failed = 0
count = 0
for test_case in self.test_cases[mode]:
ret = self._run_test_case(test_case)
if ret is False or not ret.is_correct():
failed += 1
else:
self._add_correct_test_case(test_case)
count += 1
return RunResult(count, failed)
def _func(self, test_case: 'TestCase'):
ret = self._run_test_case(test_case)
if ret.is_correct():
return 0, [test_case]
return 1, []
def _run_parallel(self, mode: str, parallel_jobs: int) -> RunResult:
pool = multiprocessing.Pool(parallel_jobs)
rets = pool.map(self._func, self.test_cases[mode])
return RunResult(len(rets), sum(map(lambda x: x[0], rets)))
def _run_test_case(self, test_case: 'TestCase') -> Optional['TestResult']:
try:
ret = test_case.run()
color = "green" if ret.is_correct() else "red"
print(colored("[{result:7s}] {tc:40s}".format(
result="SUCCESS" if ret.is_correct() else "FAIL",
tc=test_case.name()), color, attrs=["bold"]) +
colored("" if ret.is_correct() else ret.short_message(), color))
try:
if not exists(self.env.report_dir):
os.mkdir(self.env.report_dir)
rep_dir = join(self.env.report_dir, test_case.type)
if not exists(rep_dir):
os.mkdir(rep_dir)
suffix = ".correct" if ret.is_correct() else ".incorrect"
ret.store_at(join(rep_dir, test_case.short_name() + suffix))
return ret
except IOError:
_LOG.exception("Caught i/o error while trying to store the report for '{}'"
.format(test_case.name()))
return False
except KeyboardInterrupt:
raise
except BaseException:
_LOG.exception("At test case '{}'".format(test_case.short_name()))
return False
def store(self):
for mode in self.correct_test_cases.keys():
log_file = self._log_file_for_type(mode)
try:
with open(log_file, "w") as f:
f.write("\n".join(self.correct_test_cases[mode]))
except IOError as e:
_LOG.exception("Caught i/o error while storing {}".format(log_file))
class TestCase:
"""
A single test case.
"""
TEST_CASE_CLASSES = {
TestMode.syntax: []
}
FILE_ENDINGS = []
def __init__(self, env: Environment, type: str, file: str):
self.env = env
self.type = type
self.file = file
def should_succeed(self) -> bool:
raise NotImplementedError()
def can_run(self, mode: str = "") -> bool:
mode = mode or self.env.mode
if mode == TestMode.exec:
return self.type == TestMode.exec
if mode == TestMode.semantic:
return self.type == TestMode.semantic \
or (self.type == TestMode.exec and self.should_succeed())
if mode == TestMode.syntax:
return self.type == TestMode.syntax or \
(self.can_run(TestMode.semantic) and self.should_succeed())
def run(self) -> 'TestResult':
raise NotImplementedError()
@classmethod
def create_from_file(cls, env: Environment, mode: str, file: str) -> Optional['TestCase']:
if cls.has_valid_file_ending(mode, file):
return cls._test_case_class_for_file(mode, file)(env, mode, file)
return None
def name(self):
return "{}:{}".format(self.type, self.short_name())
def short_name(self) -> str:
raise NotImplementedError()
@classmethod
def _test_case_class_for_file(cls, type: str, file: str):
for t in cls.TEST_CASE_CLASSES[type]:
if any(file.endswith(e) for e in t.FILE_ENDINGS):
return t
return False
@classmethod
def has_valid_file_ending(cls, type: str, file: str):
return cls._test_case_class_for_file(type, file) != False
class TestResult:
def __init__(self, test_case: TestCase, error_code: int):
self.test_case = test_case
self.error_code = error_code
def is_correct(self) -> bool:
return self.succeeded() == self.test_case.should_succeed()
def succeeded(self) -> bool:
return self.error_code == 0
def store_at(self, file: str):
with open(file, "w") as f:
print(self.long_message(), file=f)
def short_message(self) -> str:
raise NotImplementedError()
def long_message(self) -> str:
raise NotImplementedError()
class BasicTestResult(TestResult):
def __init__(self, test_case: TestCase, error_code: int, error_output: str, output: str):
super().__init__(test_case, error_code)
self._contains_error_str = "error" in error_output
self.error_output = error_output
self.output = output
def is_correct(self):
if self.succeeded():
return super().is_correct()
else:
return super().is_correct() and self._contains_error_str
def short_message(self) -> str:
if self.is_correct():
return "correct"
else:
if not self.succeeded() and not self.test_case.should_succeed() and not self._contains_error_str:
return "the error output doesn't contain the word \"error\""
return "incorrect return code"
def long_message(self) -> str:
file_content = ""
with open(self.test_case.file, "r") as f:
file_content = f.readlines()
return """{}
Source file:
{}
Output:
{}
Error output:
{}
Return code: {}
""".format(self.short_message().capitalize(), self._ident(file_content),
self._ident(self.output), self._ident(self.error_output), self.error_code)
def _ident(self, text: Union[str,List[str]]) -> str:
arr = text if isinstance(text, list) else text.split("\n")
return " " + "\n ".join(arr)
import mjtest.test.syntax_tests
\ No newline at end of file
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from base import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed)
from process import ProcessPoolExecutor
from thread import ThreadPoolExecutor
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
from __future__ import with_statement
import functools
import logging
import threading
import time
try:
from collections import namedtuple
except ImportError:
from concurrent.futures.compat import namedtuple
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
_AS_COMPLETED = '_AS_COMPLETED'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
RUNNING = 'RUNNING'
# The future was cancelled by the user...
CANCELLED = 'CANCELLED'
# ...and _Waiter.add_cancelled() was called by a worker.
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
FINISHED = 'FINISHED'
_FUTURE_STATES = [
PENDING,
RUNNING,
CANCELLED,
CANCELLED_AND_NOTIFIED,
FINISHED
]
_STATE_TO_DESCRIPTION_MAP = {
PENDING: "pending",
RUNNING: "running",
CANCELLED: "cancelled",
CANCELLED_AND_NOTIFIED: "cancelled",
FINISHED: "finished"
}
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("concurrent.futures")
STDERR_HANDLER = logging.StreamHandler()
LOGGER.addHandler(STDERR_HANDLER)
class Error(Exception):
"""Base class for all future-related exceptions."""
pass
class CancelledError(Error):
"""The Future was cancelled."""
pass
class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass
class _Waiter(object):
"""Provides the event that wait() and as_completed() block on."""
def __init__(self):
self.event = threading.Event()
self.finished_futures = []
def add_result(self, future):
self.finished_futures.append(future)
def add_exception(self, future):
self.finished_futures.append(future)
def add_cancelled(self, future):
self.finished_futures.append(future)
class _AsCompletedWaiter(_Waiter):
"""Used by as_completed()."""
def __init__(self):
super(_AsCompletedWaiter, self).__init__()
self.lock = threading.Lock()
def add_result(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_result(future)
self.event.set()
def add_exception(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_exception(future)
self.event.set()
def add_cancelled(self, future):
with self.lock:
super(_AsCompletedWaiter, self).add_cancelled(future)
self.event.set()
class _FirstCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_COMPLETED)."""
def add_result(self, future):
super(_FirstCompletedWaiter, self).add_result(future)
self.event.set()
def add_exception(self, future):
super(_FirstCompletedWaiter, self).add_exception(future)