[PATCH] New scheduler.

Nathaniel Smith njs at pobox.com
Mon Jul 28 23:18:22 UTC 2003


QMTest's current test scheduler is not very scalable at all.  This
scheduler aims to be much more scalable, and also more correct (it
should be able, for example, to properly handle the case where a test
is not runnable on any available target).  I still need to add some
more explanation and fix up the test suite (currently
regressions.cycle5 is giving a spurious failure, and also I need to
add some more tests), but this is a big enough change I wouldn't mind
some comments first.

Ignore the profiler bit, that's just for testing.

-- Nathaniel

-- 
.i dei jitfa fanmo xatra

This email may be read aloud.
-------------- next part --------------
diff -urN --exclude='*~' --exclude='.*' --exclude=CVS --exclude='*.pyo' --exclude='*.pyc' --exclude=build --exclude=GNUmakefile --exclude=config.log --exclude=config.status --exclude=setup_path.py --exclude=qm.sh --exclude=qmtest --exclude=qm.spec --exclude='*.dtd' --exclude=CATALOG --exclude=thread_target --exclude=process_target --exclude='*.qmr' qm-clean/ChangeLog qm-efficient-scheduling/ChangeLog
--- qm-clean/ChangeLog	2003-07-24 16:54:20.000000000 -0700
+++ qm-efficient-scheduling/ChangeLog	2003-07-26 04:57:00.000000000 -0700
@@ -1,3 +1,7 @@
+2003-07-26  Nathaniel Smith  <njs at codesourcery.com>
+
+	* qm/test/execution_engine.py: Rewrite scheduling logic.
+
 2003-07-24  Nathaniel Smith  <njs at codesourcery.com>
 
 	* GNUmakefile.in (RELLIBDIR): Don't add slashes to prefix when
diff -urN --exclude='*~' --exclude='.*' --exclude=CVS --exclude='*.pyo' --exclude='*.pyc' --exclude=build --exclude=GNUmakefile --exclude=config.log --exclude=config.status --exclude=setup_path.py --exclude=qm.sh --exclude=qmtest --exclude=qm.spec --exclude='*.dtd' --exclude=CATALOG --exclude=thread_target --exclude=process_target --exclude='*.qmr' qm-clean/qm/test/cmdline.py qm-efficient-scheduling/qm/test/cmdline.py
--- qm-clean/qm/test/cmdline.py	2003-07-24 14:12:32.000000000 -0700
+++ qm-efficient-scheduling/qm/test/cmdline.py	2003-07-28 13:29:09.000000000 -0700
@@ -1397,7 +1397,14 @@
         engine = ExecutionEngine(database, test_ids, context, targets,
                                  result_streams,
                                  self.__GetExpectedOutcomes())
-        return engine.Run()
+        if os.environ.has_key("QM_PROFILE"):
+            import hotshot
+            profiler = hotshot.Profile(os.environ["QM_PROFILE"])
+            retval = profiler.runcall(engine.Run)
+            profiler.close()
+            return retval
+        else:
+            return engine.Run()
                                                     
 
     def __ExecuteServer(self):
diff -urN --exclude='*~' --exclude='.*' --exclude=CVS --exclude='*.pyo' --exclude='*.pyc' --exclude=build --exclude=GNUmakefile --exclude=config.log --exclude=config.status --exclude=setup_path.py --exclude=qm.sh --exclude=qmtest --exclude=qm.spec --exclude='*.dtd' --exclude=CATALOG --exclude=thread_target --exclude=process_target --exclude='*.qmr' qm-clean/qm/test/execution_engine.py qm-efficient-scheduling/qm/test/execution_engine.py
--- qm-clean/qm/test/execution_engine.py	2003-07-03 12:28:22.000000000 -0700
+++ qm-efficient-scheduling/qm/test/execution_engine.py	2003-07-28 12:40:22.000000000 -0700
@@ -22,6 +22,7 @@
 import qm.queue
 from   qm.test.base import *
 import qm.test.cmdline
+import qm.test.database
 from   qm.test.context import *
 import qm.xmlutil
 from   result import *
@@ -33,19 +34,113 @@
 # Classes
 ########################################################################
 
+class _TestStatus(object):
+    """A '_TestStatus' object tracks the status of a test during a run.
+    """
+
+    # Implementation note: A 2-slot object takes less memory than the
+    # equivalent 2-element list or 2-element tuple (checked on Python
+    # 2.2.3), and provides a convenient place to package various bits of
+    # code.
+
+    __slots__ = "status", "parents"
+    # 'self.status' describes the current state of the test -- either a
+    # status indicator or a result outcome.
+    # 'self.parents' is either 'None', or a list of tests that have this
+    # test as a prerequisite.
+
+    CONSIDERING = "status_Considering"
+    RUNNABLE = "status_Runnable"
+
+    def __init__(self):
+
+        self.status = None
+        self.parents = None
+
+
+    # A test enters "Considering" state as soon as it has been pulled
+    # from the user's queue.  A test in this state will eventually run,
+    # even if it is never pulled from the queue again.  This is mainly
+    # used to discard already-seen tests when pulling them from the
+    # queue.
+    def MarkConsidering(self):
+
+        assert self.status is None
+        self.status = self.CONSIDERING
+
+
+    def Considering(self):
+
+        return self.status is self.CONSIDERING or self.Runnable()
+
+
+    # A tests enters "Runnable" state as soon as it gets put on the
+    # runnable queue.  A test in this state will run as soon as a free
+    # target opens up for it.  This is used to detect whether a test
+    # should still receive callbacks when a prerequisite finishes; if it
+    # has already become runnable, then it does not need to continue
+    # receiving callbacks.
+    def MarkRunnable(self):
+
+        assert self.status is self.CONSIDERING
+        self.status = self.RUNNABLE
+
+
+    def Runnable(self):
+
+        return self.status is self.RUNNABLE or self.Finished()
+
+
+    # A finished test is just that -- all done.  Any tests that had it
+    # as a prerequisite can now check their expected outcome against the
+    # real outcome.
+    def SetOutcome(self, outcome):
+
+        self.status = outcome
+
+
+    def Finished(self):
+
+        return self.status in Result.outcomes
+
+
+    def GetOutcome(self):
+        """Only valid to call this if 'Finished()' returns true."""
+
+        return self.status
+
+
+    def AddParent(self, parent_id):
+
+        if self.parents is None:
+            self.parents = []
+        self.parents.append(parent_id)
+
+
+    def ConsumeParents(self):
+
+        parents = self.parents
+        del self.parents
+        if parents is None:
+            return ()
+        else:
+            return parents
+
+
+
 class ExecutionEngine:
     """A 'ExecutionEngine' executes tests.
 
     A 'ExecutionEngine' object handles the execution of a collection
     of tests.
 
-    This class schedules the tests, plus the setup and cleanup of any
-    resources they require, across one or more targets.
+    This class schedules the tests across one or more targets.
 
     The shedule is determined dynamically as the tests are executed
     based on which targets are idle and which are not.  Therefore, the
     testing load should be reasonably well balanced, even across a
     heterogeneous network of testing machines."""
+
     
     def __init__(self,
                  database,
@@ -61,7 +156,7 @@
         
         'test_ids' -- A sequence of IDs of tests to run.  Where
         possible, the tests are started in the order specified.
-
+        
         'context' -- The context object to use when running tests.
 
         'targets' -- A sequence of 'Target' objects, representing
@@ -91,17 +186,13 @@
         
         # All of the targets are idle at first.
         self.__idle_targets = targets[:]
+        # And we haven't yet found any idle targets with nothing to do.
+        self.__stuck_targets = []
         # There are no responses from the targets yet.
         self.__response_queue = qm.queue.Queue(0)
         # There no pending or ready tests yet.
-        self.__pending = []
-        self.__ready = []
         self.__running = 0
 
-        # The descriptor graph has not yet been created.
-        self.__descriptors = {}
-        self.__descriptor_graph = {}
-        
         self.__any_unexpected_outcomes = 0
         
         # Termination has not yet been requested.
@@ -121,7 +212,7 @@
     def IsTerminationRequested(self):
         """Returns true if termination has been requested.
 
-        return -- True if Terminate has been called."""
+        returns -- True if Terminate has been called."""
 
         return self.__terminated
     
@@ -180,143 +271,425 @@
 
         self.__input_handlers[fd] = function
         
-    
+
     def _RunTests(self):
-        """Run all of the tests.
 
-        This function assumes that the targets have already been
-        started.
+        num_tests = len(self.__test_ids)
+
+        # No tests have been started yet.
+        self.__num_tests_started = 0
+
+        self.__tests_iterator = iter(self.__test_ids)
 
-        The tests are run in the order that they were presented --
-        modulo requirements regarding prerequisites and any
-        nondeterminism introduced by running tests in parallel."""
-
-        # Create a directed graph where each node is a pair
-        # (count, descriptor).  There is an edge from one node
-        # to another if the first node is a prerequisite for the
-        # second.  Begin by creating the nodes of the graph.
+        # A big table of all the tests we are to run, to track status
+        # information (and also to allow quick lookup of whether a
+        # listed prerequisite should actually be run).
+        self.__statuses = {}
         for id in self.__test_ids:
-            try:
-                descriptor = self.__database.GetTest(id)
-                self.__descriptors[id] = descriptor
-                self.__descriptor_graph[descriptor] = [0, []]
-                self.__pending.append(descriptor)
-            except:
-                result = Result(Result.TEST, id)
-                result.NoteException(cause = "Could not load test.",
-                                     outcome = Result.UNTESTED)
-                self._AddResult(result)
-                
-        # Create the edges.
-        for descriptor in self.__pending:
-            prereqs = descriptor.GetPrerequisites()
-            if prereqs:
-                for (prereq_id, outcome) in prereqs.items():
-                    if not self.__descriptors.has_key(prereq_id):
-                        # The prerequisite is not amongst the list of
-                        # tests to run.  In that case we do still run
-                        # the dependent test; it was explicitly
-                        # requested by the user.
-                        continue
-                    prereq_desc = self.__descriptors[prereq_id]
-                    self.__descriptor_graph[prereq_desc][1] \
-                        .append((descriptor, outcome))
-                    self.__descriptor_graph[descriptor][0] += 1
-
-            if not self.__descriptor_graph[descriptor][0]:
-                # A node with no prerequisites is ready.
-                self.__ready.append(descriptor)
-
-        # Iterate until there are no more tests to run.
-        while ((self.__pending or self.__ready)
-               and not self.IsTerminationRequested()):
-            # If there are no idle targets, block until we get a
-            # response.  There is nothing constructive we can do.
-            idle_targets = self.__idle_targets
-            if not idle_targets:
+            self.__statuses[id] = _TestStatus()
+
+        # The stack of tests whose prerequisites we are trying to
+        # satisfy.
+        self.__prereq_stack = []
+        # The same thing as a dict, for doing loop detection.
+        self.__ids_on_stack = {}
+
+        # No tests are currently runnable.  This is a dictionary indexed
+        # by target group.  Each element maps to a list of runnable
+        # tests in that target group.
+        self.__runnable = {}
+
+        while self.__num_tests_started < num_tests:
+            # Sweep through and clear any responses that have come
+            # back; this also updates the idle target list.
+            while self._CheckForResponse(wait=0):
+                pass
+
+            # Now look for idle targets.
+            if not self.__idle_targets:
+                if len(self.__stuck_targets) == len(self.__targets):
+                    # All targets are stuck.  This means that last time
+                    # through the loop they were all idle and no work
+                    # could be assigned to them; furthermore, no
+                    # prospect of further work has come in since then,
+                    # or they would have been moved back to the merely
+                    # idle list.  Therefore, any tests currently listed
+                    # as runnable have invalid targets and should be
+                    # marked UNTESTED.
+                    self._Trace("All targets stuck"
+                                " -- clearing runnable queue.")
+                    self._ClearAllRunnableTests("No matching target")
+                    continue
+                # Otherwise, we just need to block until there's work to
+                # do.
                 self._Trace("All targets are busy -- waiting.")
-                # Read a reply from the response_queue.
                 self._CheckForResponse(wait=1)
                 self._Trace("Response received.")
-                # Keep going.
+                # Found one; start over in hopes things will be better
+                # this time.
                 continue
 
-            # If there are no tests ready to run, but no tests are
-            # actually running at this time, we have
-            # a cycle in the dependency graph.  Pull the head off the
-            # pending queue and mark it UNTESTED, see if that helps.
-            if (not self.__ready and not self.__running):
-                descriptor = self.__pending[0]
-                self._Trace(("Dependency cycle, discarding %s."
-                             % descriptor.GetId()))
-                self.__pending.remove(descriptor)
-                self._AddUntestedResult(descriptor.GetId(),
+            # We are careful to only keep idle targets that we could
+            # find tests for, and to loop around again after feeding
+            # each target exactly one test.  If were to instead to give
+            # each target as many tests as it could take until it
+            # became idle, we would have problems in the serial case,
+            # because the target would never become idle, and we would
+            # never loop around to clear out the 'Result's queue.
+            new_idle = []
+            while self.__idle_targets:
+                target = self.__idle_targets.pop()
+                if not self._RunATestOn(target):
+                    # We couldn't find a test; rather than loop
+                    # around in circles, ignore this target until we
+                    # get some more runnable tests.
+                    self.__stuck_targets.append(target)
+                elif target.IsIdle():
+                    # Target is still idle.
+                    new_idle.append(target)
+            self.__idle_targets = new_idle
+
+        # Now all tests have been started; we just have wait for them
+        # all to finish.
+        while self.__running:
+            self._CheckForResponse(wait=1)
+
+
+    def _MakeRunnable(self, descriptor):
+        """Adds a test to the runnable queue."""
+
+        test_id = descriptor.GetId()
+        self._Trace("Test '%s' has become runnable." % (test_id,))
+        self.__statuses[test_id].MarkRunnable()
+
+        # Previously stuck targets may have something to do now, so
+        # unstick them all.
+        if self.__stuck_targets:
+            self.__idle_targets += self.__stuck_targets
+            self.__stuck_targets = []
+
+        target_group = descriptor.GetTargetGroup()
+        try:
+            stack = self.__runnable[target_group]
+        except KeyError:
+            stack = self.__runnable[target_group] = []
+        stack.append(test_id)
+
+
+    def _RunATestOn(self, target):
+        """Tries to run a test on the given target.
+
+        returns -- true on success, false on failure.  Failure means
+        that no test was found that could be run on this target."""
+
+        self._Trace("Looking for a test for target %s"
+                    % target.GetName())
+
+        for target_group, tests in self.__runnable.iteritems():
+            if target.IsInGroup(target_group) and tests:
+                test_id = tests.pop()
+                self._Trace("About to run '%s'." % (test_id,))
+                assert self.__statuses[test_id].Runnable()
+                try:
+                    descriptor = self.__database.GetTest(test_id)
+                except qm.test.database.NoSuchTestError:
+                    self._Trace("But it couldn't be loaded!")
+                    self._AddLoadErrorResult(test_id)
+                else:
+                    self.__num_tests_started += 1
+                    self.__running += 1
+                    target.RunTest(descriptor, self.__context)
+                return 1
+
+        # No already runnable tests.  Try to get new ones.
+        while 1:
+            descriptor = self._FindRunnableTest()
+            if descriptor is None:
+                # We're out of runnable tests altogether.
+                return 0
+            elif target.IsInGroup(descriptor.GetTargetGroup()):
+                # We found a test we can run.
+                id = descriptor.GetId()
+                self._Trace("Found runnable test '%s'" % (id,))
+                self.__num_tests_started += 1
+                self.__running += 1
+                target.RunTest(descriptor, self.__context)
+                return 1
+            else:
+                # We found a test that we can't run, but someone else
+                # can; put it on the queue for them and try again.
+                self._MakeRunnable(descriptor)
+
+
+    class _ConsumedTest(Exception):
+        """Thrown when a test is consumed instead of processed normally.
+
+        Generally this happens because an error was detected during said
+        processing, and the test was thrown out marked UNTESTED."""
+        
+        pass
+
+
+    def _FindRunnableTest(self):
+        """Attempt find at least one runnable test.
+
+        This will only return tests that are ready to be run
+        immediately, though as a side effect it will set up tests to
+        become automatically runnable later when the prerequisites they
+        depend on finish running.  All tests become runnable in one of
+        these two ways, and therefore this can be considered the core of
+        the scheduling algorithm.  It is only here that tests are pulled
+        from the user-provided list, and here is also where we detect
+        cycles.
+
+        returns -- the descriptor of the new runnable test, or 'None' if
+        no such test could be found."""
+
+        while 1:
+            if not self.__prereq_stack:
+                # We ran out of prerequisite tests, so pull a new one
+                # off the user's list.
+                try:
+                    test_id = self.__tests_iterator.next()
+                except StopIteration:
+                    # We're entirely out of fresh tests; give up.
+                    return None
+                if self.__statuses[test_id].Considering():
+                    # This test has already been handled (probably
+                    # because it's a prereq of a test already seen).
+                    continue
+                # We have a fresh test.
+                try:
+                    self._AddTestToStack(test_id)
+                except self._ConsumedTest:
+                    # Or maybe not.  Skip it and go on to the next.
+                    continue
+                self._Trace("Added new test %s to root of stack" % (test_id,))
+
+            descriptor, prereqs = self.__prereq_stack[-1]
+            # The prereqs list only mentions tests that still need to be
+            # run.
+            if prereqs:
+                # Pick a prereq and start over with it.
+                try:
+                    self._AddTestToStack(prereqs.pop())
+                    continue
+                except self._ConsumedTest:
+                    # If it ran immediately, start over with the same
+                    # one again.
+                    continue
+            else:
+                # This test is ready to come off the stack.
+                # Physically remove it from the stack.
+                test_id = descriptor.GetId()
+                del self.__ids_on_stack[test_id]
+                self.__prereq_stack.pop()
+
+                # Now, either it's ready to run, or needs to wait for
+                # some prereqs to finish.
+                try:
+                    waiting_on = self._IsWaitingFor(descriptor)
+                except self._ConsumedTest:
+                    # ... or else, it just disappears, so we try again.
+                    continue
+                
+                if not waiting_on:
+                    # We've finally found our runnable test.
+                    return descriptor
+                else:
+                    # This test will be runnable once we hear back from
+                    # the tests it depends on.  Ask them to notify it.
+                    for child_id in waiting_on:
+                        self.__statuses[child_id].AddParent(test_id)
+                    # But we still need our immediately runnable test,
+                    # so try again.
+                    continue
+
+            # Should never get here.
+            assert 0
+
+
+    def _AddTestToStack(self, test_id):
+        """Adds 'test_id' to the stack of current tests.
+
+        Updates the test status, sets up cycle detection, and suchlike.
+        May consume the passed in test; if so, will throw a
+        '_ConsumedTest' exception."""
+        
+        self._Trace("Trying to add %s to stack" % test_id)
+
+        # Update test status.
+        self.__statuses[test_id].MarkConsidering()
+
+        # Load the descriptor.
+        try:
+            descriptor = self.__database.GetTest(test_id)
+        except:
+            self._AddLoadErrorResult(test_id)
+            raise self._ConsumedTest, test_id, sys.exc_info()[2]
+
+        # Check for cycles.
+
+        # One might think that the way to detect a cycle was to check
+        # whether the test being added to the stack was already on the
+        # stack.  But in fact this doesn't work, because we have other
+        # logic to ensure that any test is only added to the stack once
+        # over the entire run.  This avoids extra work, but it thwarts
+        # the obvious cycle-detection check.  So instead we explicitly
+        # check the stack for all prerequisites of this test.
+        for prereq_id in descriptor.GetPrerequisites():
+            if prereq_id in self.__ids_on_stack:
+                self._Trace("Cycle detected (%s)" % (prereq_id,))
+                self._AddUntestedResult(test_id,
                                         qm.message("dependency cycle"))
-                self._UpdateDependentTests(descriptor, Result.UNTESTED)
+                raise self._ConsumedTest, "cycle detected"
+        self.__ids_on_stack[test_id] = None
+
+        # Finally, calculate the "interesting" prerequisites (the ones
+        # that we actually need to run).
+        def is_relevant(prereq):
+            return self.__statuses.has_key(prereq) \
+                   and not self.__statuses[prereq].Considering()
+        prereqs_iter = iter(descriptor.GetPrerequisites())
+        relevant_prereqs = filter(is_relevant, prereqs_iter)
+
+        # And store it all in the stack.
+        self.__prereq_stack.append((descriptor, relevant_prereqs))
+
+        
+    def _IsWaitingFor(self, test_descriptor):
+        """Finds the prerequisites 'test_descriptor' is waiting on.
+
+        Returns a list of id's of tests that need to complete before
+        'test_descriptor' can be run.  If any known outcomes are
+        violated, consumes the test and raises a '_ConsumedTest'
+        exception."""
+
+        id = test_descriptor.GetId()
+        needed = []
+
+        prereqs = test_descriptor.GetPrerequisites()
+        for prereq_id, outcome in prereqs.iteritems():
+            try:
+                prereq_status = self.__statuses[prereq_id]
+            except KeyError:
+                # This prerequisite is not being run at all.
                 continue
 
-            # There is at least one idle target.  Try to find something
-            # that it can do.
-            wait = 1
-            for descriptor in self.__ready:
-                for target in idle_targets:
-                    if target.IsInGroup(descriptor.GetTargetGroup()):
-                        # This test can be run on this target.  Remove
-                        # it from the ready list.
-                        self.__ready.remove(descriptor)
-                        # And from the pending list.
-                        try:
-                            self.__pending.remove(descriptor)
-                        except ValueError:
-                            # If the test is not pending, that means it
-                            # got pulled off for some reason
-                            # (e.g. breaking dependency cycles).  Don't
-                            # try to run it, it won't work.
-                            self._Trace(("Ready test %s not pending, skipped"
-                                         % descriptor.GetId()))
-                            wait = 0
-                            break
-
-                        # Output a trace message.
-                        self._Trace(("About to run %s."
-                                     % descriptor.GetId()))
-                        # Run it.
-                        self.__running += 1
-                        target.RunTest(descriptor, self.__context)
-                        # If the target is no longer idle, remove it
-                        # from the idle_targets list.
-                        if not target.IsIdle():
-                            self._Trace("Target is no longer idle.")
-                            self.__idle_targets.remove(target)
-                        else:
-                            self._Trace("Target is still idle.")
-                        # We have done something useful on this
-                        # iteration.
-                        wait = 0
-                        break
+            if prereq_status.Finished():
+                prereq_outcome = prereq_status.GetOutcome()
+                if outcome != prereq_outcome:
+                    # Failed prerequisite.
+                    self._AddUntestedResult \
+                        (id,
+                         qm.message("failed prerequisite"),
+                         {'qmtest.prequisite': prereq_id,
+                          'qmtest.outcome': prereq_outcome,
+                          'qmtest.expected_outcome': outcome })
+                    raise self._ConsumedTest
+                else:
+                    # Passed prerequisite, do nothing.
+                    pass
+            else:
+                # Unfinished prerequisite, make a note.
+                needed.append(prereq_id)
+        return needed
 
-                if not wait:
+
+    def _PrerequisiteFinishedCallback(self, test_id):
+        """Check 'test_id's prerequisites, and do the right thing.
+
+        This function is called whenever a test may have become
+        runnable, because a prerequisite's result became available.  It
+        is only called if we are not already runnable.
+
+        The "right thing" means if any prequisites fail, emit an
+        UNTESTED result; otherwise, if any prerequisites have unknown
+        result, do nothing; otherwise, add this test to the runnable
+        queue."""
+
+        self._Trace("%s had a prerequisite finish" % (test_id,))
+
+        try:
+            descriptor = self.__database.GetTest(test_id)
+        except:
+            self._AddLoadErrorResult(test_id)
+            return
+
+        try:
+            waiting_for = self._IsWaitingFor(descriptor)
+        except self._ConsumedTest:
+            return
+
+        if not waiting_for:
+            # All prerequisites ran and were satisfied.  This test can
+            # now run.
+            self._MakeRunnable(descriptor)
+            
+
+    def _HandleResult(self, result):
+        """Do processing associated with a new result.
+
+        'result' -- A 'Result' object representing the result of running
+        a test or resource."""
+
+        # Output a trace message.
+        id = result.GetId()
+        self._Trace("Recording %s result for %s." % (result.GetKind(), id))
+
+        # Find the target with the name indicated in the result.
+        if result.has_key(Result.TARGET):
+            for target in self.__targets:
+                if target.GetName() == result[Result.TARGET]:
                     break
+            else:
+                assert 0, ("No target %s exists (test id: %s)"
+                           % (result[Result.TARGET], id))
+        else:
+            # Not all results will have associated targets.  If the
+            # test was not run at all, there will be no associated
+            # target.
+            target = None
 
-            # Output a trace message.
-            self._Trace("About to check for a response in %s mode."
-                        % ((wait and "blocking") or "nonblocking"))
-                    
-            # See if any targets have finished their assignments.  If
-            # we did not schedule any additional work during this
-            # iteration of the loop, there's no point in continuing
-            # until some target finishes what it's doing.
-            self._CheckForResponse(wait=wait)
+        # Having no target is a rare occurrence; output a trace message.
+        if not target:
+            self._Trace("No target for %s." % result.GetId())
 
+        # This target might now be idle.
+        if (target
+            and target not in self.__idle_targets
+            and target not in self.__stuck_targets
+            and target.IsIdle()):
             # Output a trace message.
-            self._Trace("Done checking for responses.")
+            self._Trace("Target is now idle.\n")
+            self.__idle_targets.append(target)
+            
+        # For now, only tests have expectations or scheduling
+        # dependencies, so this next bit only applies to tests:
+        if result.GetKind() == Result.TEST:
+            # We now know this test's outcome, so record it in our global
+            # status dictionary.
+            test_status = self.__statuses[id]
+            test_status.SetOutcome(result.GetOutcome())
+
+            # And then poke all the tests that might have become runnable.
+            parents = test_status.ConsumeParents()
+            for parent_id in parents:
+                if not self.__statuses[parent_id].Runnable():
+                    self._PrerequisiteFinishedCallback(parent_id)
+
+            # Check for unexpected outcomes.
+            if result.GetKind() == Result.TEST:
+                if (self.__expectations.get(id, Result.PASS)
+                    != result.GetOutcome()):
+                    self.__any_unexpected_outcomes = 1
+            
+        # Output a trace message.
+        self._Trace("Writing result for %s to streams." % id)
 
-        # Any tests that are still pending are untested, unless there
-        # has been an explicit request that we exit immediately.
-        if not self.IsTerminationRequested():
-            for descriptor in self.__pending:
-                self._AddUntestedResult(descriptor.GetId(),
-                                        qm.message("execution terminated"))
+        # Report the result.
+        for rs in self.__result_streams:
+            rs.WriteResult(result)
 
 
     def _CheckForResponse(self, wait):
@@ -336,19 +709,12 @@
                 self._Trace("Got %s result for %s from queue."
                              % (result.GetKind(), result.GetId()))
                 # Handle it.
-                self._AddResult(result)
+                self._HandleResult(result)
                 if result.GetKind() == Result.TEST:
                     assert self.__running > 0
                     self.__running -= 1
                 # Output a trace message.
                 self._Trace("Recorded result.")
-                # If this was a test result, there may be other tests that
-                # are now eligible to run.
-                if result.GetKind() == Result.TEST:
-                    # Get the descriptor for this test.
-                    descriptor = self.__descriptors[result.GetId()]
-                    # Iterate through each of the dependent tests.
-                    self._UpdateDependentTests(descriptor, result.GetOutcome())
                 return result
             except qm.queue.Empty:
                 # If there is nothing in the queue, then this exception will
@@ -371,102 +737,20 @@
                 continue
 
 
-    def _UpdateDependentTests(self, descriptor, outcome):
-        """Update the status of tests that depend on 'node'.
+    ### Various methods to signal errors with particular tests.
 
-        'descriptor' -- A test descriptor.
 
-        'outcome' -- The outcome associated with the test.
+    def _AddErrorResult(self, result):
+        """All error results should be noted with this method.
 
-        If tests that depend on 'descriptor' required a particular
-        outcome, and 'outcome' is different, mark them as untested.  If
-        tests that depend on 'descriptor' are now eligible to run, add
-        them to the '__ready' queue."""
-
-        node = self.__descriptor_graph[descriptor]
-        for (d, o) in node[1]:
-            # Find the node for the dependent test.
-            n = self.__descriptor_graph[d]
-            # If some other prerequisite has already had an undesired
-            # outcome, there is nothing more to do.
-            if n[0] == 0:
-                continue
+        Error results are those that indicate that a test was not run.
+        This is important to keep an accurate count of how many tests
+        are left to run, and to ensure test status's are updated
+        correctly."""
 
-            # If the actual outcome is not the outcome that was
-            # expected, the dependent test cannot be run.
-            if outcome != o:
-                try:
-                    # This test will never be run.
-                    n[0] = 0
-                    self.__pending.remove(d)
-                    # Mark it untested.
-                    self._AddUntestedResult(d.GetId(),
-                                            qm.message("failed prerequisite"),
-                                            { 'qmtest.prequisite' :
-                                              descriptor.GetId(),
-                                              'qmtest.outcome' : outcome,
-                                              'qmtest.expected_outcome' : o })
-                    # Recursively remove tests that depend on d.
-                    self._UpdateDependentTests(d, Result.UNTESTED)
-                except ValueError:
-                    # This test has already been taken off the pending queue;
-                    # assume a result has already been recorded.  This can
-                    # happen when we're breaking dependency cycles.
-                    pass
-            else:
-                # Decrease the count associated with the node, if
-                # the test has not already been declared a failure.
-                n[0] -= 1
-                # If this was the last prerequisite, this test
-                # is now ready.
-                if n[0] == 0:
-                    self.__ready.append(d)
-                    
-    
-    def _AddResult(self, result):
-        """Report the result of running a test or resource.
-
-        'result' -- A 'Result' object representing the result of running
-        a test or resource."""
-
-        # Output a trace message.
-        self._Trace("Recording %s result for %s."
-                    % (result.GetKind(), result.GetId()))
-
-        # Find the target with the name indicated in the result.
-        if result.has_key(Result.TARGET):
-            for target in self.__targets:
-                if target.GetName() == result[Result.TARGET]:
-                    break
-        else:
-            # Not all results will have associated targets.  If the
-            # test was not run at all, there will be no associated
-            # target.
-            target = None
-
-        # Having no target is a rare occurrence; output a trace message.
-        if not target:
-            self._Trace("No target for %s." % result.GetId())
-                        
-        # Check for unexpected outcomes.
-        if result.GetKind() == Result.TEST  \
-           and (self.__expectations.get(result.GetId(), Result.PASS)
-                != result.GetOutcome()):
-            self.__any_unexpected_outcomes = 1
-            
-        # This target might now be idle.
-        if (target and target not in self.__idle_targets
-            and target.IsIdle()):
-            # Output a trace message.
-            self._Trace("Target is now idle.\n")
-            self.__idle_targets.append(target)
-
-        # Output a trace message.
-        self._Trace("Writing result for %s to streams." % result.GetId())
-
-        # Report the result.
-        for rs in self.__result_streams:
-            rs.WriteResult(result)
+        self.__num_tests_started += 1
+        self.__statuses[result.GetId()].MarkRunnable()
+        self._HandleResult(result)
 
 
     def _AddUntestedResult(self, test_name, cause, annotations={}):
@@ -482,7 +766,34 @@
         # Create the result.
         result = Result(Result.TEST, test_name, Result.UNTESTED, annotations)
         result[Result.CAUSE] = cause
-        self._AddResult(result)
+        self._AddErrorResult(result)
+
+
+    def _AddLoadErrorResult(self, test_id):
+        """Add a 'Result' indicating that loading 'test_id' failed.
+
+        Should be called from the catch block that caught the error, as
+        'Result.NoteException' is called."""
+
+        result = Result(Result.TEST, test_id)
+        result.NoteException(cause = "Could not load test.",
+                             outcome = Result.UNTESTED)
+        self._AddErrorResult(result)
+        
+
+    def _ClearAllRunnableTests(self, cause):
+        """Marks all currently runnable tests as UNTESTED.
+
+        This is called when it is detected that all tests currently in
+        the runnable queue can never by run, generally because their
+        target specification does not match any available target."""
+
+        for runnable_stack in self.__runnable.itervalues():
+            while runnable_stack:
+                self._AddUntestedResult(runnable_stack.pop(), cause)
+
+
+    ### Utility methods.
 
 
     def _Trace(self, message):


More information about the qmtest mailing list