[lit] Add support for multiprocessing, under --use-processes for now.
authorDaniel Dunbar <daniel@zuster.org>
Thu, 29 Aug 2013 00:54:23 +0000 (00:54 +0000)
committerDaniel Dunbar <daniel@zuster.org>
Thu, 29 Aug 2013 00:54:23 +0000 (00:54 +0000)
git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@189556 91177308-0d34-0410-b5e6-96231b3b80d8

utils/lit/TODO
utils/lit/lit/main.py
utils/lit/lit/run.py

index e6aeb3d933904aaeb2e673ea2171240f3efcc60f..c1a60c6f4f09f764478c67cedd17f26226107c66 100644 (file)
@@ -113,8 +113,8 @@ Infrastructure
   module. This is currently blocked on:
 
   * The external execution mode is faster in some situations, because it avoids
-    being bottlenecked on the GIL. We could fix this by moving to a good
-    multiprocessing model.
+    being bottlenecked on the GIL. This can hopefully be obviated simply by
+    using --use-processes.
 
   * Some tests in LLVM/Clang are explicitly disabled with the internal shell
     (because they use features specific to bash). We would need to rewrite these
@@ -158,8 +158,6 @@ Miscellaneous
 
 * Add --show-unsupported, don't show by default?
 
-* Optionally use multiprocessing.
-
 * Support valgrind in all configs, and LLVM style valgrind.
 
 * Support a timeout / ulimit.
index 5c40f1ca5337f144f32b6e8cbb3fec8cfd266800..50c9a66c8d3bfd0167cd33f4408f9068c301c40d 100755 (executable)
@@ -142,6 +142,12 @@ def main(builtinParameters = {}):
     group.add_option("", "--show-tests", dest="showTests",
                       help="Show all discovered tests",
                       action="store_true", default=False)
+    group.add_option("", "--use-processes", dest="useProcesses",
+                      help="Run tests in parallel with processes (not threads)",
+                      action="store_true", default=False)
+    group.add_option("", "--use-threads", dest="useProcesses",
+                      help="Run tests in parallel with threads (not processes)",
+                      action="store_false", default=False)
     parser.add_option_group(group)
 
     (opts, args) = parser.parse_args()
@@ -264,7 +270,8 @@ def main(builtinParameters = {}):
     startTime = time.time()
     display = TestingProgressDisplay(opts, len(run.tests), progressBar)
     try:
-        run.execute_tests(display, opts.numThreads, opts.maxTime)
+        run.execute_tests(display, opts.numThreads, opts.maxTime,
+                          opts.useProcesses)
     except KeyboardInterrupt:
         sys.exit(2)
     display.finish()
index 617c3b988f0854c4c9fb0410bce708d43779f7d9..8642ff189270428e0e5ffe5b055a741b31c80a6a 100644 (file)
@@ -2,42 +2,68 @@ import os
 import threading
 import time
 import traceback
+try:
+    import Queue as queue
+except ImportError:
+    import queue
 
 try:
     import win32api
 except ImportError:
     win32api = None
 
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
+
 import lit.Test
 
 ###
 # Test Execution Implementation
 
-class TestProvider(object):
-    def __init__(self, tests):
-        self.iter = iter(range(len(tests)))
+class LockedValue(object):
+    def __init__(self, value):
         self.lock = threading.Lock()
-        self.canceled = False
+        self._value = value
 
-    def cancel(self):
+    def _get_value(self):
         self.lock.acquire()
-        self.canceled = True
-        self.lock.release()
+        try:
+            return self._value
+        finally:
+            self.lock.release()
 
-    def get(self):
-        # Check if we are cancelled.
+    def _set_value(self, value):
         self.lock.acquire()
-        if self.canceled:
-          self.lock.release()
+        try:
+            self._value = value
+        finally:
+            self.lock.release()
+
+    value = property(_get_value, _set_value)
+
+class TestProvider(object):
+    def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
+        self.canceled_flag = canceled_flag
+
+        # Create a shared queue to provide the test indices.
+        self.queue = queue_impl()
+        for i in range(len(tests)):
+            self.queue.put(i)
+        for i in range(num_jobs):
+            self.queue.put(None)
+
+    def cancel(self):
+        self.canceled_flag.value = 1
+
+    def get(self):
+        # Check if we are canceled.
+        if self.canceled_flag.value:
           return None
 
         # Otherwise take the next test.
-        for item in self.iter:
-            break
-        else:
-            item = None
-        self.lock.release()
-        return item
+        return self.queue.get()
 
 class Tester(object):
     def __init__(self, run_instance, provider, consumer):
@@ -46,7 +72,7 @@ class Tester(object):
         self.consumer = consumer
 
     def run(self):
-        while 1:
+        while True:
             item = self.provider.get()
             if item is None:
                 break
@@ -82,6 +108,42 @@ class ThreadResultsConsumer(object):
     def handle_results(self):
         pass
 
+class MultiprocessResultsConsumer(object):
+    def __init__(self, run, display, num_jobs):
+        self.run = run
+        self.display = display
+        self.num_jobs = num_jobs
+        self.queue = multiprocessing.Queue()
+
+    def update(self, test_index, test):
+        # This method is called in the child processes, and communicates the
+        # results to the actual display implementation via an output queue.
+        self.queue.put((test_index, test.result))
+
+    def task_finished(self):
+        # This method is called in the child processes, and communicates that
+        # individual tasks are complete.
+        self.queue.put(None)
+
+    def handle_results(self):
+        # This method is called in the parent, and consumes the results from the
+        # output queue and dispatches to the actual display. The method will
+        # complete after each of num_jobs tasks has signalled completion.
+        completed = 0
+        while completed != self.num_jobs:
+            # Wait for a result item.
+            item = self.queue.get()
+            if item is None:
+                completed += 1
+                continue
+
+            # Update the test result in the parent process.
+            index,result = item
+            test = self.run.tests[index]
+            test.result = result
+
+            self.display.update(test)
+
 def run_one_tester(run, provider, display):
     tester = Tester(run, provider, display)
     tester.run()
@@ -123,7 +185,8 @@ class Run(object):
 
         test.setResult(result)
 
-    def execute_tests(self, display, jobs, max_time=None):
+    def execute_tests(self, display, jobs, max_time=None,
+                      use_processes=False):
         """
         execute_tests(display, jobs, [max_time])
 
@@ -145,8 +208,20 @@ class Run(object):
         be given an UNRESOLVED result.
         """
 
-        # Create the test provider object.
-        provider = TestProvider(self.tests)
+        # Choose the appropriate parallel execution implementation.
+        if jobs == 1 or not use_processes or multiprocessing is None:
+            task_impl = threading.Thread
+            queue_impl = queue.Queue
+            canceled_flag = LockedValue(0)
+            consumer = ThreadResultsConsumer(display)
+        else:
+            task_impl = multiprocessing.Process
+            queue_impl = multiprocessing.Queue
+            canceled_flag =  multiprocessing.Value('i', 0)
+            consumer = MultiprocessResultsConsumer(self, display, jobs)
+
+        # Create the test provider.
+        provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
 
         # Install a console-control signal handler on Windows.
         if win32api is not None:
@@ -162,8 +237,12 @@ class Run(object):
             timeout_timer = threading.Timer(max_time, timeout_handler)
             timeout_timer.start()
 
-        # Actually execute the tests.
-        self._execute_tests_with_provider(provider, display, jobs)
+        # If not using multiple tasks, just run the tests directly.
+        if jobs == 1:
+            run_one_tester(self, provider, consumer)
+        else:
+            # Otherwise, execute the tests in parallel
+            self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
 
         # Cancel the timeout handler.
         if max_time is not None:
@@ -174,18 +253,10 @@ class Run(object):
             if test.result is None:
                 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
 
-    def _execute_tests_with_provider(self, provider, display, jobs):
-        consumer = ThreadResultsConsumer(display)
-
-        # If only using one testing thread, don't use tasks at all; this lets us
-        # profile, among other things.
-        if jobs == 1:
-            run_one_tester(self, provider, consumer)
-            return
-
+    def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
         # Start all of the tasks.
-        tasks = [threading.Thread(target=run_one_tester,
-                                  args=(self, provider, consumer))
+        tasks = [task_impl(target=run_one_tester,
+                           args=(self, provider, consumer))
                  for i in range(jobs)]
         for t in tasks:
             t.start()