+import os
+import threading
+import time
+import traceback
+try:
+ import Queue as queue
+except ImportError:
+ import queue
+
+try:
+ import win32api
+except ImportError:
+ win32api = None
+
+try:
+ import multiprocessing
+except ImportError:
+ multiprocessing = None
+
+import lit.Test
+
+###
+# Test Execution Implementation
+
+class LockedValue(object):
+ def __init__(self, value):
+ self.lock = threading.Lock()
+ self._value = value
+
+ def _get_value(self):
+ self.lock.acquire()
+ try:
+ return self._value
+ finally:
+ self.lock.release()
+
+ def _set_value(self, value):
+ self.lock.acquire()
+ try:
+ self._value = value
+ finally:
+ self.lock.release()
+
+ value = property(_get_value, _set_value)
+
+class TestProvider(object):
+ def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
+ self.canceled_flag = canceled_flag
+
+ # Create a shared queue to provide the test indices.
+ self.queue = queue_impl()
+ for i in range(len(tests)):
+ self.queue.put(i)
+ for i in range(num_jobs):
+ self.queue.put(None)
+
+ def cancel(self):
+ self.canceled_flag.value = 1
+
+ def get(self):
+ # Check if we are canceled.
+ if self.canceled_flag.value:
+ return None
+
+ # Otherwise take the next test.
+ return self.queue.get()
+
+class Tester(object):
+ def __init__(self, run_instance, provider, consumer):
+ self.run_instance = run_instance
+ self.provider = provider
+ self.consumer = consumer
+
+ def run(self):
+ while True:
+ item = self.provider.get()
+ if item is None:
+ break
+ self.run_test(item)
+ self.consumer.task_finished()
+
+ def run_test(self, test_index):
+ test = self.run_instance.tests[test_index]
+ try:
+ self.run_instance.execute_test(test)
+ except KeyboardInterrupt:
+ # This is a sad hack. Unfortunately subprocess goes
+ # bonkers with ctrl-c and we start forking merrily.
+ print('\nCtrl-C detected, goodbye.')
+ os.kill(0,9)
+ self.consumer.update(test_index, test)
+
+class ThreadResultsConsumer(object):
+ def __init__(self, display):
+ self.display = display
+ self.lock = threading.Lock()
+
+ def update(self, test_index, test):
+ self.lock.acquire()
+ try:
+ self.display.update(test)
+ finally:
+ self.lock.release()
+
+ def task_finished(self):
+ pass
+
+ def handle_results(self):
+ pass
+
+class MultiprocessResultsConsumer(object):
+ def __init__(self, run, display, num_jobs):
+ self.run = run
+ self.display = display
+ self.num_jobs = num_jobs
+ self.queue = multiprocessing.Queue()
+
+ def update(self, test_index, test):
+ # This method is called in the child processes, and communicates the
+ # results to the actual display implementation via an output queue.
+ self.queue.put((test_index, test.result))
+
+ def task_finished(self):
+ # This method is called in the child processes, and communicates that
+ # individual tasks are complete.
+ self.queue.put(None)
+
+ def handle_results(self):
+ # This method is called in the parent, and consumes the results from the
+ # output queue and dispatches to the actual display. The method will
+ # complete after each of num_jobs tasks has signalled completion.
+ completed = 0
+ while completed != self.num_jobs:
+ # Wait for a result item.
+ item = self.queue.get()
+ if item is None:
+ completed += 1
+ continue
+
+ # Update the test result in the parent process.
+ index,result = item
+ test = self.run.tests[index]
+ test.result = result
+
+ self.display.update(test)
+
+def run_one_tester(run, provider, display):
+ tester = Tester(run, provider, display)
+ tester.run()
+
+###
+
class Run(object):
"""
This class represents a concrete, configured testing run.
def __init__(self, lit_config, tests):
self.lit_config = lit_config
self.tests = tests
+
+ def execute_test(self, test):
+ result = None
+ start_time = time.time()
+ try:
+ result = test.config.test_format.execute(test, self.lit_config)
+
+ # Support deprecated result from execute() which returned the result
+ # code and additional output as a tuple.
+ if isinstance(result, tuple):
+ code, output = result
+ result = lit.Test.Result(code, output)
+ elif not isinstance(result, lit.Test.Result):
+ raise ValueError("unexpected result from test execution")
+ except KeyboardInterrupt:
+ raise
+ except:
+ if self.lit_config.debug:
+ raise
+ output = 'Exception during script execution:\n'
+ output += traceback.format_exc()
+ output += '\n'
+ result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+ result.elapsed = time.time() - start_time
+
+ test.setResult(result)
+
+ def execute_tests(self, display, jobs, max_time=None,
+ use_processes=False):
+ """
+ execute_tests(display, jobs, [max_time])
+
+ Execute each of the tests in the run, using up to jobs number of
+ parallel tasks, and inform the display of each individual result. The
+ provided tests should be a subset of the tests available in this run
+ object.
+
+ If max_time is non-None, it should be a time in seconds after which to
+ stop executing tests.
+
+ The display object will have its update method called with each test as
+ it is completed. The calls are guaranteed to be locked with respect to
+ one another, but are *not* guaranteed to be called on the same thread as
+ this method was invoked on.
+
+ Upon completion, each test in the run will have its result
+ computed. Tests which were not actually executed (for any reason) will
+ be given an UNRESOLVED result.
+ """
+
+ # Choose the appropriate parallel execution implementation.
+ consumer = None
+ if jobs != 1 and use_processes and multiprocessing:
+ try:
+ task_impl = multiprocessing.Process
+ queue_impl = multiprocessing.Queue
+ canceled_flag = multiprocessing.Value('i', 0)
+ consumer = MultiprocessResultsConsumer(self, display, jobs)
+ except:
+ # multiprocessing fails to initialize with certain OpenBSD and
+ # FreeBSD Python versions: http://bugs.python.org/issue3770
+ # Unfortunately the error raised also varies by platform.
+ self.lit_config.note('failed to initialize multiprocessing')
+ consumer = None
+ if not consumer:
+ task_impl = threading.Thread
+ queue_impl = queue.Queue
+ canceled_flag = LockedValue(0)
+ consumer = ThreadResultsConsumer(display)
+
+ # Create the test provider.
+ provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
+
+ # Install a console-control signal handler on Windows.
+ if win32api is not None:
+ def console_ctrl_handler(type):
+ provider.cancel()
+ return True
+ win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+
+ # Install a timeout handler, if requested.
+ if max_time is not None:
+ def timeout_handler():
+ provider.cancel()
+ timeout_timer = threading.Timer(max_time, timeout_handler)
+ timeout_timer.start()
+
+ # If not using multiple tasks, just run the tests directly.
+ if jobs == 1:
+ run_one_tester(self, provider, consumer)
+ else:
+ # Otherwise, execute the tests in parallel
+ self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
+
+ # Cancel the timeout handler.
+ if max_time is not None:
+ timeout_timer.cancel()
+
+ # Update results for any tests which weren't run.
+ for test in self.tests:
+ if test.result is None:
+ test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
+
+ def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
+ # Start all of the tasks.
+ tasks = [task_impl(target=run_one_tester,
+ args=(self, provider, consumer))
+ for i in range(jobs)]
+ for t in tasks:
+ t.start()
+
+ # Allow the consumer to handle results, if necessary.
+ consumer.handle_results()
+
+ # Wait for all the tasks to complete.
+ for t in tasks:
+ t.join()