import lit.ProgressBar
import lit.LitConfig
import lit.Test
+import lit.run
import lit.util
import lit.discovery
def update(self, test):
# Avoid locking overhead in quiet mode
- if self.opts.quiet and not test.result.isFailure:
+ if self.opts.quiet and not test.result.code.isFailure:
self.completed += 1
return
self.progressBar.update(float(self.completed)/self.numTests,
test.getFullName())
- if self.opts.succinct and not test.result.isFailure:
+ if self.opts.succinct and not test.result.code.isFailure:
return
if self.progressBar:
self.progressBar.clear()
- print('%s: %s (%d of %d)' % (test.result.name, test.getFullName(),
+ print('%s: %s (%d of %d)' % (test.result.code.name, test.getFullName(),
self.completed, self.numTests))
- if test.result.isFailure and self.opts.showOutput:
+ if test.result.code.isFailure and self.opts.showOutput:
print("%s TEST '%s' FAILED %s" % ('*'*20, test.getFullName(),
'*'*20))
- print(test.output)
+ print(test.result.output)
print("*" * 20)
sys.stdout.flush()
return item
class Tester(threading.Thread):
- def __init__(self, litConfig, provider, display):
+ def __init__(self, run_instance, provider, display):
threading.Thread.__init__(self)
- self.litConfig = litConfig
+ self.run_instance = run_instance
self.provider = provider
self.display = display
self.runTest(item)
def runTest(self, test):
- result = None
- startTime = time.time()
try:
- result, output = test.config.test_format.execute(test,
- self.litConfig)
+ self.run_instance.execute_test(test)
except KeyboardInterrupt:
# This is a sad hack. Unfortunately subprocess goes
# bonkers with ctrl-c and we start forking merrily.
print('\nCtrl-C detected, goodbye.')
os.kill(0,9)
- except:
- if self.litConfig.debug:
- raise
- result = lit.Test.UNRESOLVED
- output = 'Exception during script execution:\n'
- output += traceback.format_exc()
- output += '\n'
- elapsed = time.time() - startTime
-
- test.setResult(result, output, elapsed)
self.display.update(test)
-def runTests(numThreads, litConfig, provider, display):
+def runTests(numThreads, run, provider, display):
# If only using one testing thread, don't use threads at all; this lets us
# profile, among other things.
if numThreads == 1:
- t = Tester(litConfig, provider, display)
+ t = Tester(run, provider, display)
t.run()
return
# Otherwise spin up the testing threads and wait for them to finish.
- testers = [Tester(litConfig, provider, display)
+ testers = [Tester(run, provider, display)
for i in range(numThreads)]
for t in testers:
t.start()
params = userParams,
config_prefix = opts.configPrefix)
- tests = lit.discovery.find_tests_for_inputs(litConfig, inputs)
+ # Perform test discovery.
+ run = lit.run.Run(litConfig,
+ lit.discovery.find_tests_for_inputs(litConfig, inputs))
if opts.showSuites or opts.showTests:
# Aggregate the tests by suite.
suitesAndTests = {}
- for t in tests:
+ for t in run.tests:
if t.suite not in suitesAndTests:
suitesAndTests[t.suite] = []
suitesAndTests[t.suite].append(t)
sys.exit(0)
# Select and order the tests.
- numTotalTests = len(tests)
+ numTotalTests = len(run.tests)
# First, select based on the filter expression if given.
if opts.filter:
except:
parser.error("invalid regular expression for --filter: %r" % (
opts.filter))
- tests = [t for t in tests
- if rex.search(t.getFullName())]
+ run.tests = [t for t in run.tests
+ if rex.search(t.getFullName())]
# Then select the order.
if opts.shuffle:
- random.shuffle(tests)
+ random.shuffle(run.tests)
else:
- tests.sort(key = lambda t: t.getFullName())
+ run.tests.sort(key = lambda t: t.getFullName())
# Finally limit the number of tests, if desired.
if opts.maxTests is not None:
- tests = tests[:opts.maxTests]
+ run.tests = run.tests[:opts.maxTests]
# Don't create more threads than tests.
- opts.numThreads = min(len(tests), opts.numThreads)
+ opts.numThreads = min(len(run.tests), opts.numThreads)
extra = ''
- if len(tests) != numTotalTests:
+ if len(run.tests) != numTotalTests:
extra = ' of %d' % numTotalTests
- header = '-- Testing: %d%s tests, %d threads --'%(len(tests),extra,
+ header = '-- Testing: %d%s tests, %d threads --'%(len(run.tests), extra,
opts.numThreads)
progressBar = None
print(header)
startTime = time.time()
- display = TestingProgressDisplay(opts, len(tests), progressBar)
- provider = TestProvider(tests, opts.maxTime)
+ display = TestingProgressDisplay(opts, len(run.tests), progressBar)
+ provider = TestProvider(run.tests, opts.maxTime)
try:
import win32api
return True
win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
- runTests(opts.numThreads, litConfig, provider, display)
+ runTests(opts.numThreads, run, provider, display)
display.finish()
if not opts.quiet:
print('Testing Time: %.2fs'%(time.time() - startTime))
# Update results for any tests which weren't run.
- for t in tests:
- if t.result is None:
- t.setResult(lit.Test.UNRESOLVED, '', 0.0)
+ for test in run.tests:
+ if test.result is None:
+ test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
# List test results organized by kind.
hasFailures = False
byCode = {}
- for t in tests:
- if t.result not in byCode:
- byCode[t.result] = []
- byCode[t.result].append(t)
- if t.result.isFailure:
+ for test in run.tests:
+ if test.result.code not in byCode:
+ byCode[test.result.code] = []
+ byCode[test.result.code].append(test)
+ if test.result.code.isFailure:
hasFailures = True
# Print each test in any of the failing groups.
continue
print('*'*20)
print('%s (%d):' % (title, len(elts)))
- for t in elts:
- print(' %s' % t.getFullName())
+ for test in elts:
+ print(' %s' % test.getFullName())
sys.stdout.write('\n')
- if opts.timeTests and tests:
+ if opts.timeTests and run.tests:
# Order by time.
- test_times = [(t.getFullName(), t.elapsed)
- for t in tests]
+ test_times = [(test.getFullName(), test.result.elapsed)
+ for test in run.tests]
lit.util.printHistogram(test_times, title='Tests')
for name,code in (('Expected Passes ', lit.Test.PASS),