Lit: Rework r249161; Move RLIMIT_NPROC to main.py.
[oota-llvm.git] / utils / lit / lit / run.py
index 8a9f87b2014ed3b9402768359f69a0c845c252c2..27c414d6dd65e6525cc728ac9a98d90e76931f2c 100644 (file)
@@ -1,3 +1,155 @@
+import os
+import threading
+import time
+import traceback
+try:
+    import Queue as queue
+except ImportError:
+    import queue
+
+try:
+    import win32api
+except ImportError:
+    win32api = None
+
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
+
+import lit.Test
+
+###
+# Test Execution Implementation
+
+class LockedValue(object):
+    def __init__(self, value):
+        self.lock = threading.Lock()
+        self._value = value
+
+    def _get_value(self):
+        self.lock.acquire()
+        try:
+            return self._value
+        finally:
+            self.lock.release()
+
+    def _set_value(self, value):
+        self.lock.acquire()
+        try:
+            self._value = value
+        finally:
+            self.lock.release()
+
+    value = property(_get_value, _set_value)
+
+class TestProvider(object):
+    def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
+        self.canceled_flag = canceled_flag
+
+        # Create a shared queue to provide the test indices.
+        self.queue = queue_impl()
+        for i in range(len(tests)):
+            self.queue.put(i)
+        for i in range(num_jobs):
+            self.queue.put(None)
+
+    def cancel(self):
+        self.canceled_flag.value = 1
+
+    def get(self):
+        # Check if we are canceled.
+        if self.canceled_flag.value:
+          return None
+
+        # Otherwise take the next test.
+        return self.queue.get()
+
+class Tester(object):
+    def __init__(self, run_instance, provider, consumer):
+        self.run_instance = run_instance
+        self.provider = provider
+        self.consumer = consumer
+
+    def run(self):
+        while True:
+            item = self.provider.get()
+            if item is None:
+                break
+            self.run_test(item)
+        self.consumer.task_finished()
+
+    def run_test(self, test_index):
+        test = self.run_instance.tests[test_index]
+        try:
+            self.run_instance.execute_test(test)
+        except KeyboardInterrupt:
+            # This is a sad hack. Unfortunately subprocess goes
+            # bonkers with ctrl-c and we start forking merrily.
+            print('\nCtrl-C detected, goodbye.')
+            os.kill(0,9)
+        self.consumer.update(test_index, test)
+
+class ThreadResultsConsumer(object):
+    def __init__(self, display):
+        self.display = display
+        self.lock = threading.Lock()
+
+    def update(self, test_index, test):
+        self.lock.acquire()
+        try:
+            self.display.update(test)
+        finally:
+            self.lock.release()
+
+    def task_finished(self):
+        pass
+
+    def handle_results(self):
+        pass
+
+class MultiprocessResultsConsumer(object):
+    def __init__(self, run, display, num_jobs):
+        self.run = run
+        self.display = display
+        self.num_jobs = num_jobs
+        self.queue = multiprocessing.Queue()
+
+    def update(self, test_index, test):
+        # This method is called in the child processes, and communicates the
+        # results to the actual display implementation via an output queue.
+        self.queue.put((test_index, test.result))
+
+    def task_finished(self):
+        # This method is called in the child processes, and communicates that
+        # individual tasks are complete.
+        self.queue.put(None)
+
+    def handle_results(self):
+        # This method is called in the parent, and consumes the results from the
+        # output queue and dispatches to the actual display. The method will
+        # complete after each of num_jobs tasks has signalled completion.
+        completed = 0
+        while completed != self.num_jobs:
+            # Wait for a result item.
+            item = self.queue.get()
+            if item is None:
+                completed += 1
+                continue
+
+            # Update the test result in the parent process.
+            index,result = item
+            test = self.run.tests[index]
+            test.result = result
+
+            self.display.update(test)
+
+def run_one_tester(run, provider, display):
+    tester = Tester(run, provider, display)
+    tester.run()
+
+###
+
 class Run(object):
     """
     This class represents a concrete, configured testing run.
@@ -6,3 +158,120 @@ class Run(object):
     def __init__(self, lit_config, tests):
         self.lit_config = lit_config
         self.tests = tests
+
+    def execute_test(self, test):
+        result = None
+        start_time = time.time()
+        try:
+            result = test.config.test_format.execute(test, self.lit_config)
+
+            # Support deprecated result from execute() which returned the result
+            # code and additional output as a tuple.
+            if isinstance(result, tuple):
+                code, output = result
+                result = lit.Test.Result(code, output)
+            elif not isinstance(result, lit.Test.Result):
+                raise ValueError("unexpected result from test execution")
+        except KeyboardInterrupt:
+            raise
+        except:
+            if self.lit_config.debug:
+                raise
+            output = 'Exception during script execution:\n'
+            output += traceback.format_exc()
+            output += '\n'
+            result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+        result.elapsed = time.time() - start_time
+
+        test.setResult(result)
+
+    def execute_tests(self, display, jobs, max_time=None,
+                      use_processes=False):
+        """
+        execute_tests(display, jobs, [max_time])
+
+        Execute each of the tests in the run, using up to jobs number of
+        parallel tasks, and inform the display of each individual result. The
+        provided tests should be a subset of the tests available in this run
+        object.
+
+        If max_time is non-None, it should be a time in seconds after which to
+        stop executing tests.
+
+        The display object will have its update method called with each test as
+        it is completed. The calls are guaranteed to be locked with respect to
+        one another, but are *not* guaranteed to be called on the same thread as
+        this method was invoked on.
+
+        Upon completion, each test in the run will have its result
+        computed. Tests which were not actually executed (for any reason) will
+        be given an UNRESOLVED result.
+        """
+
+        # Choose the appropriate parallel execution implementation.
+        consumer = None
+        if jobs != 1 and use_processes and multiprocessing:
+            try:
+                task_impl = multiprocessing.Process
+                queue_impl = multiprocessing.Queue
+                canceled_flag =  multiprocessing.Value('i', 0)
+                consumer = MultiprocessResultsConsumer(self, display, jobs)
+            except:
+                # multiprocessing fails to initialize with certain OpenBSD and
+                # FreeBSD Python versions: http://bugs.python.org/issue3770
+                # Unfortunately the error raised also varies by platform.
+                self.lit_config.note('failed to initialize multiprocessing')
+                consumer = None
+        if not consumer:
+            task_impl = threading.Thread
+            queue_impl = queue.Queue
+            canceled_flag = LockedValue(0)
+            consumer = ThreadResultsConsumer(display)
+
+        # Create the test provider.
+        provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
+
+        # Install a console-control signal handler on Windows.
+        if win32api is not None:
+            def console_ctrl_handler(type):
+                provider.cancel()
+                return True
+            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+
+        # Install a timeout handler, if requested.
+        if max_time is not None:
+            def timeout_handler():
+                provider.cancel()
+            timeout_timer = threading.Timer(max_time, timeout_handler)
+            timeout_timer.start()
+
+        # If not using multiple tasks, just run the tests directly.
+        if jobs == 1:
+            run_one_tester(self, provider, consumer)
+        else:
+            # Otherwise, execute the tests in parallel
+            self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
+
+        # Cancel the timeout handler.
+        if max_time is not None:
+            timeout_timer.cancel()
+
+        # Update results for any tests which weren't run.
+        for test in self.tests:
+            if test.result is None:
+                test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
+
+    def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
+        # Start all of the tasks.
+        tasks = [task_impl(target=run_one_tester,
+                           args=(self, provider, consumer))
+                 for i in range(jobs)]
+        for t in tasks:
+            t.start()
+
+        # Allow the consumer to handle results, if necessary.
+        consumer.handle_results()
+
+        # Wait for all the tasks to complete.
+        for t in tasks:
+            t.join()