-class TestProvider:
- def __init__(self, tests, maxTime):
- self.maxTime = maxTime
- self.iter = iter(tests)
- self.lock = threading.Lock()
- self.startTime = time.time()
- self.canceled = False
-
- def cancel(self):
- self.lock.acquire()
- self.canceled = True
- self.lock.release()
-
- def get(self):
- # Check if we have run out of time.
- if self.maxTime is not None:
- if time.time() - self.startTime > self.maxTime:
- return None
-
- # Otherwise take the next test.
- self.lock.acquire()
- if self.canceled:
- self.lock.release()
- return None
- for item in self.iter:
- break
- else:
- item = None
- self.lock.release()
- return item
-
-class Tester(threading.Thread):
- def __init__(self, run_instance, provider, display):
- threading.Thread.__init__(self)
- self.run_instance = run_instance
- self.provider = provider
- self.display = display
-
- def run(self):
- while 1:
- item = self.provider.get()
- if item is None:
- break
- self.runTest(item)
-
- def runTest(self, test):
- try:
- self.run_instance.execute_test(test)
- except KeyboardInterrupt:
- # This is a sad hack. Unfortunately subprocess goes
- # bonkers with ctrl-c and we start forking merrily.
- print('\nCtrl-C detected, goodbye.')
- os.kill(0,9)
- self.display.update(test)
-
-def runTests(numThreads, run, provider, display):
- # If only using one testing thread, don't use threads at all; this lets us
- # profile, among other things.
- if numThreads == 1:
- t = Tester(run, provider, display)
- t.run()
+def write_test_results(run, lit_config, testing_time, output_path):
+ try:
+ import json
+ except ImportError:
+ lit_config.fatal('test output unsupported with Python 2.5')
+
+ # Construct the data we will write.
+ data = {}
+ # Encode the current lit version as a schema version.
+ data['__version__'] = lit.__versioninfo__
+ data['elapsed'] = testing_time
+ # FIXME: Record some information on the lit configuration used?
+ # FIXME: Record information from the individual test suites?
+
+ # Encode the tests.
+ data['tests'] = tests_data = []
+ for test in run.tests:
+ test_data = {
+ 'name' : test.getFullName(),
+ 'code' : test.result.code.name,
+ 'output' : test.result.output,
+ 'elapsed' : test.result.elapsed }
+
+ # Add test metrics, if present.
+ if test.result.metrics:
+ test_data['metrics'] = metrics_data = {}
+ for key, value in test.result.metrics.items():
+ metrics_data[key] = value.todata()
+
+ tests_data.append(test_data)
+
+ # Write the output.
+ f = open(output_path, 'w')
+ try:
+ json.dump(data, f, indent=2, sort_keys=True)
+ f.write('\n')
+ finally:
+ f.close()
+
+def update_incremental_cache(test):
+ if not test.result.code.isFailure: