16 import multiprocessing
18 multiprocessing = None
23 # Test Execution Implementation
25 class LockedValue(object):
26 def __init__(self, value):
27 self.lock = threading.Lock()
37 def _set_value(self, value):
44 value = property(_get_value, _set_value)
46 class TestProvider(object):
47 def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
48 self.canceled_flag = canceled_flag
50 # Create a shared queue to provide the test indices.
51 self.queue = queue_impl()
52 for i in range(len(tests)):
54 for i in range(num_jobs):
58 self.canceled_flag.value = 1
61 # Check if we are canceled.
62 if self.canceled_flag.value:
65 # Otherwise take the next test.
66 return self.queue.get()
69 def __init__(self, run_instance, provider, consumer):
70 self.run_instance = run_instance
71 self.provider = provider
72 self.consumer = consumer
76 item = self.provider.get()
80 self.consumer.task_finished()
82 def run_test(self, test_index):
83 test = self.run_instance.tests[test_index]
85 self.run_instance.execute_test(test)
86 except KeyboardInterrupt:
87 # This is a sad hack. Unfortunately subprocess goes
88 # bonkers with ctrl-c and we start forking merrily.
89 print('\nCtrl-C detected, goodbye.')
91 self.consumer.update(test_index, test)
93 class ThreadResultsConsumer(object):
94 def __init__(self, display):
95 self.display = display
96 self.lock = threading.Lock()
98 def update(self, test_index, test):
101 self.display.update(test)
105 def task_finished(self):
108 def handle_results(self):
111 class MultiprocessResultsConsumer(object):
112 def __init__(self, run, display, num_jobs):
114 self.display = display
115 self.num_jobs = num_jobs
116 self.queue = multiprocessing.Queue()
118 def update(self, test_index, test):
119 # This method is called in the child processes, and communicates the
120 # results to the actual display implementation via an output queue.
121 self.queue.put((test_index, test.result))
123 def task_finished(self):
124 # This method is called in the child processes, and communicates that
125 # individual tasks are complete.
128 def handle_results(self):
129 # This method is called in the parent, and consumes the results from the
130 # output queue and dispatches to the actual display. The method will
131 # complete after each of num_jobs tasks has signalled completion.
133 while completed != self.num_jobs:
134 # Wait for a result item.
135 item = self.queue.get()
140 # Update the test result in the parent process.
142 test = self.run.tests[index]
145 self.display.update(test)
147 def run_one_tester(run, provider, display):
148 tester = Tester(run, provider, display)
155 This class represents a concrete, configured testing run.
158 def __init__(self, lit_config, tests):
159 self.lit_config = lit_config
162 def execute_test(self, test):
164 start_time = time.time()
166 result = test.config.test_format.execute(test, self.lit_config)
168 # Support deprecated result from execute() which returned the result
169 # code and additional output as a tuple.
170 if isinstance(result, tuple):
171 code, output = result
172 result = lit.Test.Result(code, output)
173 elif not isinstance(result, lit.Test.Result):
174 raise ValueError("unexpected result from test execution")
175 except KeyboardInterrupt:
178 if self.lit_config.debug:
180 output = 'Exception during script execution:\n'
181 output += traceback.format_exc()
183 result = lit.Test.Result(lit.Test.UNRESOLVED, output)
184 result.elapsed = time.time() - start_time
186 test.setResult(result)
188 def execute_tests(self, display, jobs, max_time=None,
189 use_processes=False):
191 execute_tests(display, jobs, [max_time])
193 Execute each of the tests in the run, using up to jobs number of
194 parallel tasks, and inform the display of each individual result. The
195 provided tests should be a subset of the tests available in this run
198 If max_time is non-None, it should be a time in seconds after which to
199 stop executing tests.
201 The display object will have its update method called with each test as
202 it is completed. The calls are guaranteed to be locked with respect to
203 one another, but are *not* guaranteed to be called on the same thread as
204 this method was invoked on.
206 Upon completion, each test in the run will have its result
207 computed. Tests which were not actually executed (for any reason) will
208 be given an UNRESOLVED result.
211 # Choose the appropriate parallel execution implementation.
212 if jobs == 1 or not use_processes or multiprocessing is None:
213 task_impl = threading.Thread
214 queue_impl = queue.Queue
215 canceled_flag = LockedValue(0)
216 consumer = ThreadResultsConsumer(display)
218 task_impl = multiprocessing.Process
219 queue_impl = multiprocessing.Queue
220 canceled_flag = multiprocessing.Value('i', 0)
221 consumer = MultiprocessResultsConsumer(self, display, jobs)
223 # Create the test provider.
224 provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
226 # Install a console-control signal handler on Windows.
227 if win32api is not None:
228 def console_ctrl_handler(type):
231 win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
233 # Install a timeout handler, if requested.
234 if max_time is not None:
235 def timeout_handler():
237 timeout_timer = threading.Timer(max_time, timeout_handler)
238 timeout_timer.start()
240 # If not using multiple tasks, just run the tests directly.
242 run_one_tester(self, provider, consumer)
244 # Otherwise, execute the tests in parallel
245 self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
247 # Cancel the timeout handler.
248 if max_time is not None:
249 timeout_timer.cancel()
251 # Update results for any tests which weren't run.
252 for test in self.tests:
253 if test.result is None:
254 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
256 def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
257 # Start all of the tasks.
258 tasks = [task_impl(target=run_one_tester,
259 args=(self, provider, consumer))
260 for i in range(jobs)]
264 # Allow the consumer to handle results, if necessary.
265 consumer.handle_results()
267 # Wait for all the tasks to complete.