self.iter = iter(tests)
self.lock = threading.Lock()
self.startTime = time.time()
+ self.canceled = False
+
+ def cancel(self):
+ self.lock.acquire()
+ self.canceled = True
+ self.lock.release()
def get(self):
# Check if we have run out of time.
# Otherwise take the next test.
self.lock.acquire()
+ if self.canceled:
+ self.lock.release()
+ return None
+
try:
item = self.iter.next()
except StopIteration:
startTime = time.time()
display = TestingProgressDisplay(opts, len(tests), progressBar)
provider = TestProvider(tests, opts.maxTime)
+
+ try:
+ import win32api
+ except ImportError:
+ pass
+ else:
+ def console_ctrl_handler(type):
+ provider.cancel()
+ return True
+ win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+
runTests(opts.numThreads, litConfig, provider, display)
display.finish()