[lit] Raise the default soft process limit when possible
[oota-llvm.git] / utils / lit / lit / run.py
1 import os
2 import threading
3 import time
4 import traceback
5 try:
6     import Queue as queue
7 except ImportError:
8     import queue
9
10 try:
11     import win32api
12 except ImportError:
13     win32api = None
14
15 try:
16     import multiprocessing
17 except ImportError:
18     multiprocessing = None
19
20 import lit.Test
21
22 ###
23 # Test Execution Implementation
24
25 class LockedValue(object):
26     def __init__(self, value):
27         self.lock = threading.Lock()
28         self._value = value
29
30     def _get_value(self):
31         self.lock.acquire()
32         try:
33             return self._value
34         finally:
35             self.lock.release()
36
37     def _set_value(self, value):
38         self.lock.acquire()
39         try:
40             self._value = value
41         finally:
42             self.lock.release()
43
44     value = property(_get_value, _set_value)
45
46 class TestProvider(object):
47     def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
48         self.canceled_flag = canceled_flag
49
50         # Create a shared queue to provide the test indices.
51         self.queue = queue_impl()
52         for i in range(len(tests)):
53             self.queue.put(i)
54         for i in range(num_jobs):
55             self.queue.put(None)
56
57     def cancel(self):
58         self.canceled_flag.value = 1
59
60     def get(self):
61         # Check if we are canceled.
62         if self.canceled_flag.value:
63           return None
64
65         # Otherwise take the next test.
66         return self.queue.get()
67
68 class Tester(object):
69     def __init__(self, run_instance, provider, consumer):
70         self.run_instance = run_instance
71         self.provider = provider
72         self.consumer = consumer
73
74     def run(self):
75         while True:
76             item = self.provider.get()
77             if item is None:
78                 break
79             self.run_test(item)
80         self.consumer.task_finished()
81
82     def run_test(self, test_index):
83         test = self.run_instance.tests[test_index]
84         try:
85             self.run_instance.execute_test(test)
86         except KeyboardInterrupt:
87             # This is a sad hack. Unfortunately subprocess goes
88             # bonkers with ctrl-c and we start forking merrily.
89             print('\nCtrl-C detected, goodbye.')
90             os.kill(0,9)
91         self.consumer.update(test_index, test)
92
93 class ThreadResultsConsumer(object):
94     def __init__(self, display):
95         self.display = display
96         self.lock = threading.Lock()
97
98     def update(self, test_index, test):
99         self.lock.acquire()
100         try:
101             self.display.update(test)
102         finally:
103             self.lock.release()
104
105     def task_finished(self):
106         pass
107
108     def handle_results(self):
109         pass
110
111 class MultiprocessResultsConsumer(object):
112     def __init__(self, run, display, num_jobs):
113         self.run = run
114         self.display = display
115         self.num_jobs = num_jobs
116         self.queue = multiprocessing.Queue()
117
118     def update(self, test_index, test):
119         # This method is called in the child processes, and communicates the
120         # results to the actual display implementation via an output queue.
121         self.queue.put((test_index, test.result))
122
123     def task_finished(self):
124         # This method is called in the child processes, and communicates that
125         # individual tasks are complete.
126         self.queue.put(None)
127
128     def handle_results(self):
129         # This method is called in the parent, and consumes the results from the
130         # output queue and dispatches to the actual display. The method will
131         # complete after each of num_jobs tasks has signalled completion.
132         completed = 0
133         while completed != self.num_jobs:
134             # Wait for a result item.
135             item = self.queue.get()
136             if item is None:
137                 completed += 1
138                 continue
139
140             # Update the test result in the parent process.
141             index,result = item
142             test = self.run.tests[index]
143             test.result = result
144
145             self.display.update(test)
146
147 def run_one_tester(run, provider, display):
148     tester = Tester(run, provider, display)
149     tester.run()
150
151 ###
152
153 class Run(object):
154     """
155     This class represents a concrete, configured testing run.
156     """
157
158     def __init__(self, lit_config, tests):
159         self.lit_config = lit_config
160         self.tests = tests
161
162     def execute_test(self, test):
163         result = None
164         start_time = time.time()
165         try:
166             result = test.config.test_format.execute(test, self.lit_config)
167
168             # Support deprecated result from execute() which returned the result
169             # code and additional output as a tuple.
170             if isinstance(result, tuple):
171                 code, output = result
172                 result = lit.Test.Result(code, output)
173             elif not isinstance(result, lit.Test.Result):
174                 raise ValueError("unexpected result from test execution")
175         except KeyboardInterrupt:
176             raise
177         except:
178             if self.lit_config.debug:
179                 raise
180             output = 'Exception during script execution:\n'
181             output += traceback.format_exc()
182             output += '\n'
183             result = lit.Test.Result(lit.Test.UNRESOLVED, output)
184         result.elapsed = time.time() - start_time
185
186         test.setResult(result)
187
188     def execute_tests(self, display, jobs, max_time=None,
189                       use_processes=False):
190         """
191         execute_tests(display, jobs, [max_time])
192
193         Execute each of the tests in the run, using up to jobs number of
194         parallel tasks, and inform the display of each individual result. The
195         provided tests should be a subset of the tests available in this run
196         object.
197
198         If max_time is non-None, it should be a time in seconds after which to
199         stop executing tests.
200
201         The display object will have its update method called with each test as
202         it is completed. The calls are guaranteed to be locked with respect to
203         one another, but are *not* guaranteed to be called on the same thread as
204         this method was invoked on.
205
206         Upon completion, each test in the run will have its result
207         computed. Tests which were not actually executed (for any reason) will
208         be given an UNRESOLVED result.
209         """
210
211         # Choose the appropriate parallel execution implementation.
212         consumer = None
213         if jobs != 1 and use_processes and multiprocessing:
214             try:
215                 task_impl = multiprocessing.Process
216                 queue_impl = multiprocessing.Queue
217                 canceled_flag =  multiprocessing.Value('i', 0)
218                 consumer = MultiprocessResultsConsumer(self, display, jobs)
219             except:
220                 # multiprocessing fails to initialize with certain OpenBSD and
221                 # FreeBSD Python versions: http://bugs.python.org/issue3770
222                 # Unfortunately the error raised also varies by platform.
223                 self.lit_config.note('failed to initialize multiprocessing')
224                 consumer = None
225         if not consumer:
226             task_impl = threading.Thread
227             queue_impl = queue.Queue
228             canceled_flag = LockedValue(0)
229             consumer = ThreadResultsConsumer(display)
230
231         # Because some tests use threads internally, and at least on Linux each
232         # of these threads counts toward the current process limit, try to
233         # raise the (soft) process limit so that tests don't fail due to
234         # resource exhaustion.
235         try:
236           cpus = lit.util.detectCPUs()
237           desired_limit = jobs * cpus * 2 # the 2 is a safety factor
238
239           # Import the resource module here inside this try block because it
240           # will likely fail on Windows.
241           import resource
242
243           max_procs_soft, max_procs_hard = resource.getrlimit(resource.RLIMIT_NPROC)
244           desired_limit = min(desired_limit, max_procs_hard)
245
246           if max_procs_soft < desired_limit:
247             resource.setrlimit(resource.RLIMIT_NPROC, (desired_limit, max_procs_hard))
248             self.lit_config.note('raised the process limit from %d to %d' % \
249                                  (max_procs_soft, desired_limit))
250         except:
251           pass
252
253         # Create the test provider.
254         provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
255
256         # Install a console-control signal handler on Windows.
257         if win32api is not None:
258             def console_ctrl_handler(type):
259                 provider.cancel()
260                 return True
261             win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
262
263         # Install a timeout handler, if requested.
264         if max_time is not None:
265             def timeout_handler():
266                 provider.cancel()
267             timeout_timer = threading.Timer(max_time, timeout_handler)
268             timeout_timer.start()
269
270         # If not using multiple tasks, just run the tests directly.
271         if jobs == 1:
272             run_one_tester(self, provider, consumer)
273         else:
274             # Otherwise, execute the tests in parallel
275             self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
276
277         # Cancel the timeout handler.
278         if max_time is not None:
279             timeout_timer.cancel()
280
281         # Update results for any tests which weren't run.
282         for test in self.tests:
283             if test.result is None:
284                 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
285
286     def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
287         # Start all of the tasks.
288         tasks = [task_impl(target=run_one_tester,
289                            args=(self, provider, consumer))
290                  for i in range(jobs)]
291         for t in tasks:
292             t.start()
293
294         # Allow the consumer to handle results, if necessary.
295         consumer.handle_results()
296
297         # Wait for all the tasks to complete.
298         for t in tasks:
299             t.join()