[PATCH] MPI support for SerialAsync scheduler

Jeffrey D. Oldham oldham at codesourcery.com
Mon Jan 5 21:30:43 UTC 2004


Richard Guenther wrote:
> The patch was tested as usual.
> 
> Ok to commit?

I have some questions and comments interspersed below.

> Thanks, Richard.
> 
> 
> 2004Jan02  Richard Guenther <richard.guenther at uni-tuebingen.de>
> 
> 	* src/Threads/IterateSchedulers/SerialAsync.h: doxygenifize,
> 	add std::stack<int> for generation tracking, add support for
> 	asyncronous MPI requests.

Add an 'h' to spell asynchronous.

> 	src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp: define
> 	new static variables.
> 
> --- /home/richard/src/pooma/cvs/r2/src/Threads/IterateSchedulers/SerialAsync.h	2000-06-09 00:16:50.000000000 +0200
> +++ Threads/IterateSchedulers/SerialAsync.h	2004-01-02 00:40:16.000000000 +0100
> @@ -42,48 +42,38 @@
>  // DataObject<SerialAsync>
>  //-----------------------------------------------------------------------------
> 
> -#include <iostream>
> -
>  #ifndef _SerialAsync_h_
>  #define _SerialAsync_h_
> -/*
> -LIBRARY:
> -        SerialAsync
> -
> -CLASSES: IterateScheduler
> -
> -CLASSES: DataObject
> -
> -CLASSES: Iterate
> -
> -OVERVIEW
> -        SerialAsync IterateScheduler is a policy template to create a
> -        dependence graphs and executes the graph respecting the
> -        dependencies without using threads. There is no parallelism,
> -        but Iterates may be executed out-of-order with respect to the
> -        program text.
> -
> ------------------------------------------------------------------------------*/
> -
> -//////////////////////////////////////////////////////////////////////
> 
> -//-----------------------------------------------------------------------------
> -// Overview:
> -// Smarts classes for times when you want no threads but you do want
> -// dataflow evaluation.
> -//-----------------------------------------------------------------------------
> -
> -//-----------------------------------------------------------------------------
> -// Typedefs:
> -//-----------------------------------------------------------------------------
> +/** @file
> + * @ingroup IterateSchedulers
> + * @brief
> + * Smarts classes for times when you want no threads but you do want
> + * dataflow evaluation.
> + *
> + * SerialAsync IterateScheduler is a policy template to create a
> + * dependence graphs and executes the graph respecting the
> + * dependencies without using threads.
> + * There is no (thread level) parallelism, but Iterates may be executed
> + * out-of-order with respect to the program text. Also this scheduler is
> + * used for message based parallelism in which case asyncronous execution
> + * leads to reduced communication latencies.
> + */
> 
>  //-----------------------------------------------------------------------------
>  // Includes:
>  //-----------------------------------------------------------------------------
> 
>  #include <list>
> +#include <vector>
> +#include <map>
> +#include <set>
> +#include <functional>
> +#include <stack>
>  #include "Threads/IterateSchedulers/IterateScheduler.h"
>  #include "Threads/IterateSchedulers/Runnable.h"
> +#include "Tulip/Messaging.h"
> +#include "Utilities/PAssert.h"
> 
>  //-----------------------------------------------------------------------------
>  // Forward Declarations:
> @@ -94,76 +84,261 @@
> 
>  namespace Smarts {
> 
> -#define MYID 0
> -#define MAX_CPUS 1
> -//
> -// Tag class for specializing IterateScheduler, Iterate and DataObject.
> -//
> +/**
> + * Tag class for specializing IterateScheduler, Iterate and DataObject.
> + */
> +
>  struct SerialAsync
>  {
> -  enum Action { Read, Write};
> +  enum Action { Read, Write };
>  };
> 
> 
> -//-----------------------------------------------------------------------------
> +/**
> + * Iterate<SerialAsync> is used to implement the SerialAsync
> + * scheduling policy.
> + *
> + * An Iterate is a non-blocking unit of concurrency that is used
> + * to describe a chunk of work. It inherits from the Runnable
> + * class and as all subclasses of Runnable, the user specializes
> + * the run() method to specify the operation.
> + * Iterate<SerialAsync> is a further specialization of the
> + * Iterate class to use the SerialAsync Scheduling algorithm to
> + * generate the data dependency graph for a data-driven
> + * execution.
> + */
> +
> +template<>
> +class Iterate<SerialAsync> : public Runnable
> +{
> +  friend class IterateScheduler<SerialAsync>;
> +  friend class DataObject<SerialAsync>;
> +
> +public:
> +
> +  typedef DataObject<SerialAsync> DataObject_t;
> +  typedef IterateScheduler<SerialAsync> IterateScheduler_t;
> +
> +
> +  /// The Constructor for this class takes the IterateScheduler and a
> +  /// CPU affinity.  CPU affinity has a default value of -1 which means
> +  /// it may run on any CPU available.
> +
> +  inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1)
> +    : scheduler_m(s), notifications_m(1), generation_m(-1), togo_m(1)
> +  {}
> +
> +  /// The dtor is virtual because the subclasses will need to add to it.
> +
> +  virtual ~Iterate() {}
> +
> +  /// The run method does the core work of the Iterate.
> +  /// It is supplied by the subclass.
> +
> +  virtual void run() = 0;
> +
> +  //@name Stubs for the affinities
> +  /// There is no such thing in serial.
> +  //@{
> +
> +  inline int affinity() const {return 0;}
> +
> +  inline int hintAffinity() const {return 0;}
> +
> +  inline void affinity(int) {}
> +
> +  inline void hintAffinity(int) {}
> +
> +  //@}
> +
> +  /// Notify is used to indicate to the Iterate that one of the data
> +  /// objects it had requested has been granted. To do this, we dec a
> +  /// dependence counter which, if equal to 0, the Iterate is ready for
> +  /// execution.
> +
> +  void notify()
> +  {
> +    if (--notifications_m == 0)
> +      add(this);
> +  }
> +
> +  /// How many notifications remain?
> +
> +  int notifications() const { return notifications_m; }
> +
> +  void addNotification() { notifications_m++; }
> +
> +  int& generation() { return generation_m; }
> +
> +  int& togo() { return togo_m; }
> +
> +protected:
> +
> +  /// What scheduler are we working with?
> +  IterateScheduler<SerialAsync> &scheduler_m;
> +
> +  /// How many notifications should we receive before we can run?
> +  int notifications_m;
> +
> +  /// Which generation we were issued in.
> +  int generation_m;
> +
> +  /// How many times we need to go past a "did something" to be ready
> +  /// for destruction?
> +  int togo_m;
> +
> +};
> +
> +
> +/**
> + * FIXME.
> + */

I am wary of adding unfinished code to the code base.  At the very 
least, we need a more extensive comment describing what is not finished.

>  struct SystemContext
>  {
>    void addNCpus(int) {}
>    void wait() {}
>    void concurrency(int){}
> -  int concurrency() {return 1;}
> +  int concurrency() { return 1; }
>    void mustRunOn() {}
> 
>    // We have a separate message queue because they are
>    // higher priority.
> +  typedef Iterate<SerialAsync> *IteratePtr_t;
>    static std::list<RunnablePtr_t> workQueueMessages_m;
>    static std::list<RunnablePtr_t> workQueue_m;
> +#if POOMA_MPI
> +  static MPI_Request requests_m[1024];

What is this fixed constant of 1024?  Does this come from the MPI standard?

> +  static std::map<int, IteratePtr_t> allocated_requests_m;
> +  static std::set<int> free_requests_m;
> +#endif
> +
> +
> +#if POOMA_MPI
> 
> -  ///////////////////////////
> -  // This function lets you check if there are iterates that are
> -  // ready to run.
> -  inline static
> -  bool workReady()
> +  /// Query, if we have lots of MPI_Request slots available
> +
> +  static bool haveLotsOfMPIRequests()
>    {
> -    return !(workQueue_m.empty() && workQueueMessages_m.empty());
> +    return free_requests_m.size() > 1024/2;
>    }
> 
> -  ///////////////////////////
> -  // Run an iterate if one is ready.
> -  inline static
> -  void runSomething()
> +  /// Get a MPI_Request slot, associated with an iterate
> +
> +  static MPI_Request* getMPIRequest(IteratePtr_t p)
>    {
> -    if (!workQueueMessages_m.empty())
> -    {
> -      // Get the top iterate.
> -      // Delete it from the queue.
> -      // Run the iterate.
> -      // Delete the iterate.  This could put more iterates in the queue.
> +    PInsist(!free_requests_m.empty(), "No free MPIRequest slots.");
> +    int i = *free_requests_m.begin();
> +    free_requests_m.erase(free_requests_m.begin());
> +    allocated_requests_m[i] = p;
> +    p->togo()++;
> +    return &requests_m[i];
> +  }
> 
> -      RunnablePtr_t p = workQueueMessages_m.front();
> -      workQueueMessages_m.pop_front();
> -      p->execute();
> +  static void releaseMPIRequest(int i)
> +  {
> +    IteratePtr_t p = allocated_requests_m[i];
> +    allocated_requests_m.erase(i);
> +    free_requests_m.insert(i);
> +    if (--(p->togo()) == 0)
>        delete p;
> -    }
> +  }
> +
> +  static bool waitForSomeRequests(bool mayBlock)
> +  {
> +    if (allocated_requests_m.empty())
> +      return false;
> +
> +    int last_used_request = allocated_requests_m.rbegin()->first;
> +    int finished[last_used_request+1];
> +    MPI_Status statuses[last_used_request+1];
> +    int nr_finished;
> +    int res;
> +    if (mayBlock)
> +      res = MPI_Waitsome(last_used_request+1, requests_m,
> +			 &nr_finished, finished, statuses);
>      else
> -    {
> -      if (!workQueue_m.empty())
> -      {
> -	RunnablePtr_t p = workQueue_m.front();
> -	workQueue_m.pop_front();
> -	p->execute();
> -	delete p;
> +      res = MPI_Testsome(last_used_request+1, requests_m,
> +			 &nr_finished, finished, statuses);
> +    PAssert(res == MPI_SUCCESS || res == MPI_ERR_IN_STATUS);
> +    if (nr_finished == MPI_UNDEFINED)
> +      return false;
> +
> +    // release finised requests
> +    while (nr_finished--) {
> +      if (res == MPI_ERR_IN_STATUS) {
> +	if (statuses[nr_finished].MPI_ERROR != MPI_SUCCESS) {
> +	  char msg[MPI_MAX_ERROR_STRING+1];
> +	  int len;
> +	  MPI_Error_string(statuses[nr_finished].MPI_ERROR, msg, &len);
> +	  msg[len] = '\0';
> +	  PInsist(0, msg);
> +	}
>        }
> +      releaseMPIRequest(finished[nr_finished]);
>      }
> +    return true;
> +  }
> +
> +#else
> +
> +  static bool waitForSomeRequests(bool mayBlock)
> +  {
> +    return false;
> +  }
> +
> +#endif
> +
> +
> +  /// This function lets you check if there are iterates that are
> +  /// ready to run.
> +
> +  static bool workReady()
> +  {
> +    return !(workQueue_m.empty()
> +	     && workQueueMessages_m.empty()
> +#if POOMA_MPI
> +	     && allocated_requests_m.empty()
> +#endif
> +	     );
> +  }
> +
> +  /// Run an iterate if one is ready.  Returns if progress
> +  /// was made.
> +
> +  static bool runSomething(bool mayBlock = true)
> +  {
> +    // do work in this order to minimize communication latency:
> +    // - issue all messages
> +    // - do some regular work
> +    // - wait for messages to complete
> +
> +    RunnablePtr_t p = NULL;
> +    if (!workQueueMessages_m.empty()) {
> +      p = workQueueMessages_m.front();
> +      workQueueMessages_m.pop_front();
> +    } else if (!workQueue_m.empty()) {
> +      p = workQueue_m.front();
> +      workQueue_m.pop_front();
> +    }
> +
> +    if (p) {
> +      p->execute();
> +      Iterate<SerialAsync> *it = dynamic_cast<IteratePtr_t>(p);
> +      if (it) {
> +	if (--(it->togo()) == 0)
> +	  delete it;
> +      } else
> +	delete p;
> +      return true;
> +
> +    } else
> +      return waitForSomeRequests(mayBlock);
>    }
> 
>  };
> 
> -inline void addRunnable(RunnablePtr_t rn)
> -{
> -  SystemContext::workQueue_m.push_front(rn);
> -}
> +/// Adds a runnable to the appropriate work-queue.
> 
>  inline void add(RunnablePtr_t rn)
>  {
> @@ -182,25 +357,18 @@
>  inline  void wait() {}
>  inline  void mustRunOn(){}
> 
> -/*------------------------------------------------------------------------
> -CLASS
> -	IterateScheduler_Serial_Async
> -
> -	Implements a asynchronous scheduler for a data driven execution.
> -	Specializes a IterateScheduler.
> -
> -KEYWORDS
> -	Data-parallelism, Native-interface, IterateScheduler.
> -
> -DESCRIPTION
> -
> -        The SerialAsync IterateScheduler, Iterate and DataObject
> -	implement a SMARTS scheduler that does dataflow without threads.
> -	What that means is that when you hand iterates to the
> -	IterateScheduler it stores them up until you call
> -	IterateScheduler::blockingEvaluate(), at which point it evaluates
> -	iterates until the queue is empty.
> ------------------------------------------------------------------------------*/
> +
> +/**
> + * Implements a asynchronous scheduler for a data driven execution.
> + * Specializes a IterateScheduler.
> + *
> + * The SerialAsync IterateScheduler, Iterate and DataObject
> + * implement a SMARTS scheduler that does dataflow without threads.
> + * What that means is that when you hand iterates to the
> + * IterateScheduler it stores them up until you call
> + * IterateScheduler::blockingEvaluate(), at which point it evaluates
> + * iterates until the queue is empty.
> + */
> 
>  template<>
>  class IterateScheduler<SerialAsync>
> @@ -212,196 +380,128 @@
>    typedef DataObject<SerialAsync> DataObject_t;
>    typedef Iterate<SerialAsync> Iterate_t;
> 
> -  ///////////////////////////
> -  // Constructor
> -  //
> -  IterateScheduler() {}
> -
> -  ///////////////////////////
> -  // Destructor
> -  //
> -  ~IterateScheduler() {}
> -  void setConcurrency(int) {}
> -
> -  //---------------------------------------------------------------------------
> -  // Mutators.
> -  //---------------------------------------------------------------------------
> -
> -  ///////////////////////////
> -  // Tells the scheduler that the parser thread is starting a new
> -  // data-parallel statement.  Any Iterate that is handed off to the
> -  // scheduler between beginGeneration() and endGeneration() belongs
> -  // to the same data-paralllel statement and therefore has the same
> -  // generation number.
> -  //
> -  inline void beginGeneration() { }
> -
> -  ///////////////////////////
> -  // Tells the scheduler that no more Iterates will be handed off for
> -  // the data parallel statement that was begun with a
> -  // beginGeneration().
> -  //
> -  inline void endGeneration() {}
> -
> -  ///////////////////////////
> -  // The parser thread calls this method to evaluate the generated
> -  // graph until all the nodes in the dependence graph has been
> -  // executed by the scheduler.  That is to say, the scheduler
> -  // executes all the Iterates that has been handed off to it by the
> -  // parser thread.
> -  //
> -  inline
> -  void blockingEvaluate();
> -
> -  ///////////////////////////
> -  // The parser thread calls this method to ask the scheduler to run
> -  // the given Iterate when the dependence on that Iterate has been
> -  // satisfied.
> -  //
> -  inline void handOff(Iterate<SerialAsync>* it);
> +  IterateScheduler()
> +    : generation_m(0)
> +  {}
> 
> -  inline
> -  void releaseIterates() { }
> +  ~IterateScheduler() {}
> 
> -protected:
> -private:
> +  void setConcurrency(int) {}
> 
> -  typedef std::list<Iterate_t*> Container_t;
> -  typedef Container_t::iterator Iterator_t;
> +  /// Tells the scheduler that the parser thread is starting a new
> +  /// data-parallel statement.  Any Iterate that is handed off to the
> +  /// scheduler between beginGeneration() and endGeneration() belongs
> +  /// to the same data-paralllel statement and therefore has the same
> +  /// generation number.
> +  /// Nested invocations are handled as being part of the outermost
> +  /// generation.
> 
> -};
> +  void beginGeneration()
> +  {
> +    // Ensure proper overflow behavior.
> +    if (++generation_m < 0)
> +      generation_m = 0;
> +    generationStack_m.push(generation_m);
> +  }
> 
> -//-----------------------------------------------------------------------------
> +  /// Tells the scheduler that no more Iterates will be handed off for
> +  /// the data parallel statement that was begun with a
> +  /// beginGeneration().
> 
> -/*------------------------------------------------------------------------
> -CLASS
> -	Iterate_SerialAsync
> -
> -	Iterate<SerialAsync> is used to implement the SerialAsync
> -	scheduling policy.
> -
> -KEYWORDS
> -	Data_Parallelism, Native_Interface, IterateScheduler, Data_Flow.
> -
> -DESCRIPTION
> -        An Iterate is a non-blocking unit of concurrency that is used
> -	to describe a chunk of work. It inherits from the Runnable
> -	class and as all subclasses of Runnable, the user specializes
> -	the run() method to specify the operation.
> -	Iterate<SerialAsync> is a further specialization of the
> -	Iterate class to use the SerialAsync Scheduling algorithm to
> -	generate the data dependency graph for a data-driven
> -	execution.  */
> +  void endGeneration()
> +  {
> +    PAssert(inGeneration());
> +    generationStack_m.pop();
> 
> -template<>
> -class Iterate<SerialAsync> : public Runnable
> -{
> -  friend class IterateScheduler<SerialAsync>;
> -  friend class DataObject<SerialAsync>;
> +#if POOMA_MPI
> +    // this is a safe point to block until we have "lots" of MPI Requests
> +    if (!inGeneration())
> +      while (!SystemContext::haveLotsOfMPIRequests())
> +	SystemContext::runSomething(true);
> +#endif
> +  }
> 
> -public:
> +  /// Wether we are inside a generation and may not safely block.
> 
> -  typedef DataObject<SerialAsync> DataObject_t;
> -  typedef IterateScheduler<SerialAsync> IterateScheduler_t;
> +  bool inGeneration() const
> +  {
> +    return !generationStack_m.empty();
> +  }
> 
> +  /// What the current generation is.
> 
> -  ///////////////////////////
> -  // The Constructor for this class takes the IterateScheduler and a
> -  // CPU affinity.  CPU affinity has a default value of -1 which means
> -  // it may run on any CPU available.
> -  //
> -  inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1);
> -
> -  ///////////////////////////
> -  // The dtor is virtual because the subclasses will need to add to it.
> -  //
> -  virtual ~Iterate() {}
> +  int generation() const
> +  {
> +    if (!inGeneration())
> +      return -1;
> +    return generationStack_m.top();
> +  }
> 
> -  ///////////////////////////
> -  // The run method does the core work of the Iterate.
> -  // It is supplied by the subclass.
> -  //
> -  virtual void run() = 0;
> +  /// The parser thread calls this method to evaluate the generated
> +  /// graph until all the nodes in the dependence graph has been
> +  /// executed by the scheduler.  That is to say, the scheduler
> +  /// executes all the Iterates that has been handed off to it by the
> +  /// parser thread.
> 
> -  ///////////////////////////
> -  // Stub in all the affinities, because there is no such thing
> -  // in serial.
> -  //
> -  inline int affinity() const {return 0;}
> -  ///////////////////////////
> -  // Stub in all the affinities, because there is no such thing
> -  // in serial.
> -  //
> -  inline int hintAffinity() const {return 0;}
> -  ///////////////////////////
> -  // Stub in all the affinities, because there is no such thing
> -  // in serial.
> -  //
> -  inline void affinity(int) {}
> -  ///////////////////////////
> -  // Stub in all the affinities, because there is no such thing
> -  // in serial.
> -  //
> -  inline void hintAffinity(int) {}
> +  void blockingEvaluate()
> +  {
> +    if (inGeneration()) {
> +      // It's not safe to block inside a generation, so
> +      // just do as much as we can without blocking.
> +      while (SystemContext::runSomething(false))
> +	;
> +
> +    } else {
> +      // Loop as long as there is anything in the queue.
> +      while (SystemContext::workReady())
> +        SystemContext::runSomething(true);
> +    }
> +  }
> 
> -  ///////////////////////////
> -  // Notify is used to indicate to the Iterate that one of the data
> -  // objects it had requested has been granted. To do this, we dec a
> -  // dependence counter which, if equal to 0, the Iterate is ready for
> -  // execution.
> -  //
> -  inline void notify();
> -
> -  ///////////////////////////
> -  // How many notifications remain?
> -  //
> -  inline
> -  int notifications() const { return notifications_m; }
> +  /// The parser thread calls this method to ask the scheduler to run
> +  /// the given Iterate when the dependence on that Iterate has been
> +  /// satisfied.
> 
> -  inline void addNotification()
> +  void handOff(Iterate<SerialAsync>* it)
>    {
> -    notifications_m++;
> +    // No action needs to be taken here.  Iterates will make their
> +    // own way into the execution queue.
> +    it->generation() = generation();
> +    it->notify();
>    }
> 
> -protected:
> +  void releaseIterates() { }
> 
> -  // What scheduler are we working with?
> -  IterateScheduler<SerialAsync> &scheduler_m;
> +private:
> 
> -  // How many notifications should we receive before we can run?
> -  int notifications_m;
> +  typedef std::list<Iterate_t*> Container_t;
> +  typedef Container_t::iterator Iterator_t;
> 
> -private:
> -  // Set notifications dynamically and automatically every time a
> -  // request is made by the iterate
> -  void incr_notifications() { notifications_m++;}
> +  static std::stack<int> generationStack_m;
> +  int generation_m;
> 
>  };
> 
> 
> -//-----------------------------------------------------------------------------
> -
> -/*------------------------------------------------------------------------
> -CLASS
> -	DataObject_SerialAsync
> -
> -	Implements a asynchronous scheduler for a data driven execution.
> -KEYWORDS
> -	Data-parallelism, Native-interface, IterateScheduler.
> -
> -DESCRIPTION
> -        The DataObject Class is used introduce a type to represent
> -	a resources (normally) blocks of data) that Iterates contend
> -	for atomic access. Iterates make request for either a read or
> -	write to the DataObjects. DataObjects may grant the request if
> -	the object is currently available. Otherwise, the request is
> -	enqueue in a queue private to the data object until the
> -	DataObject is release by another Iterate. A set of read
> -	requests may be granted all at once if there are no
> -	intervening write request to that DataObject.
> -	DataObject<SerialAsync> is a specialization of DataObject for
> -	the policy template SerialAsync.
> -*/
> +/**
> + * Implements a asynchronous scheduler for a data driven execution.
> + *
> + * The DataObject Class is used introduce a type to represent
> + * a resources (normally) blocks of data) that Iterates contend
> + * for atomic access. Iterates make request for either a read or
> + * write to the DataObjects. DataObjects may grant the request if
> + * the object is currently available. Otherwise, the request is
> + * enqueue in a queue private to the data object until the
> + * DataObject is release by another Iterate. A set of read
> + * requests may be granted all at once if there are no
> + * intervening write request to that DataObject.
> + * DataObject<SerialAsync> is a specialization of DataObject for
> + * the policy template SerialAsync.
> + *
> + * There are two ways data can be used: to read or to write.
> + * Don't change this to give more than two states;
> + * things inside depend on that.
> + */
> 
>  template<>
>  class DataObject<SerialAsync>
> @@ -413,54 +513,56 @@
>    typedef IterateScheduler<SerialAsync> IterateScheduler_t;
>    typedef Iterate<SerialAsync> Iterate_t;
> 
> -  // There are two ways data can be used: to read or to write.
> -  // Don't change this to give more than two states:
> -  // things inside depend on that.
> -
> -  ///////////////////////////
> -  // Construct the data object with an empty set of requests
> -  // and the given affinity.
> -  //
> -  inline DataObject(int affinity=-1);
> +
> +  /// Construct the data object with an empty set of requests
> +  /// and the given affinity.
> +
> +  DataObject(int affinity=-1)
> +    : released_m(queue_m.end()), notifications_m(0)
> +  {
> +    // released_m to the end of the queue (which should) also be the
> +    // beginning.  notifications_m to zero, since nothing has been
> +    // released yet.
> +  }
> 
> -  ///////////////////////////
> -  // for compatibility with other SMARTS schedulers, accept
> -  // Scheduler arguments (unused)
> -  //
> -  inline
> -  DataObject(int affinity, IterateScheduler<SerialAsync>&);
> -
> -  ///////////////////////////
> -  // Stub out affinity because there is no affinity in serial.
> -  //
> -  inline int affinity() const { return 0; }
> -
> -  ///////////////////////////
> -  // Stub out affinity because there is no affinity in serial.
> -  //
> -  inline void affinity(int) {}
> +  /// for compatibility with other SMARTS schedulers, accept
> +  /// Scheduler arguments (unused)
> 
> -  ///////////////////////////
> -  // An iterate makes a request for a certain action in a certain
> -  // generation.
> -  //
> -  inline
> -  void request(Iterate<SerialAsync>&, SerialAsync::Action);
> -
> -  ///////////////////////////
> -  // An iterate finishes and tells the DataObject it no longer needs
> -  // it.  If this is the last release for the current set of
> -  // requests, have the IterateScheduler release some more.
> -  //
> -  inline void release(SerialAsync::Action);
> +  inline DataObject(int affinity, IterateScheduler<SerialAsync>&)
> +    : released_m(queue_m.end()), notifications_m(0)
> +  {}
> +
> +  /// Stub out affinity because there is no affinity in serial.
> +
> +  int affinity() const { return 0; }
> +
> +  /// Stub out affinity because there is no affinity in serial.
> +
> +  void affinity(int) {}
> +
> +  /// An iterate makes a request for a certain action in a certain
> +  /// generation.
> +
> +  inline void request(Iterate<SerialAsync>&, SerialAsync::Action);
> +
> +  /// An iterate finishes and tells the DataObject it no longer needs
> +  /// it.  If this is the last release for the current set of
> +  /// requests, have the IterateScheduler release some more.
> +
> +  void release(SerialAsync::Action)
> +  {
> +    if (--notifications_m == 0)
> +      releaseIterates();
> +  }
> 
> -protected:
>  private:
> 
> -  // If release needs to let more iterates go, it calls this.
> +  /// If release needs to let more iterates go, it calls this.
>    inline void releaseIterates();
> 
> -  // The type for a request.
> +  /**
> +   * The type for a request.
> +   */
>    class Request
>    {
>    public:
> @@ -475,135 +577,27 @@
>      SerialAsync::Action act_m;
>    };
> 
> -  // The type of the queue and iterator.
> +  /// The type of the queue and iterator.
>    typedef std::list<Request> Container_t;
>    typedef Container_t::iterator Iterator_t;
> 
> -  // The list of requests from various iterates.
> -  // They're granted in FIFO order.
> +  /// The list of requests from various iterates.
> +  /// They're granted in FIFO order.
>    Container_t queue_m;
> 
> -  // Pointer to the last request that has been granted.
> +  /// Pointer to the last request that has been granted.
>    Iterator_t released_m;
> 
> -  // The number of outstanding notifications.
> +  /// The number of outstanding notifications.
>    int notifications_m;
>  };
> 
> -//////////////////////////////////////////////////////////////////////
> -//
> -// Inline implementation of the functions for
> -// IterateScheduler<SerialAsync>
> -//
> -//////////////////////////////////////////////////////////////////////
> -
> -//
> -// IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>*)
> -// No action needs to be taken here.  Iterates will make their
> -// own way into the execution queue.
> -//
> -
> -inline void
> -IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>* it)
> -{
> -  it->notify();
> -}
> -
> -//////////////////////////////////////////////////////////////////////
> -//
> -// Inline implementation of the functions for Iterate<SerialAsync>
> -//
> -//////////////////////////////////////////////////////////////////////
> -
> -//
> -// Iterate<SerialAsync>::Iterate
> -// Construct with the scheduler and the number of notifications.
> -// Ignore the affinity.
> -//
> -
> -inline
> -Iterate<SerialAsync>::Iterate(IterateScheduler<SerialAsync>& s, int)
> -: scheduler_m(s), notifications_m(1)
> -{
> -}
> -
> -//
> -// Iterate<SerialAsync>::notify
> -// Notify the iterate that a DataObject is ready.
> -// Decrement the counter, and if it is zero, alert the scheduler.
> -//
> -
> -inline void
> -Iterate<SerialAsync>::notify()
> -{
> -  if ( --notifications_m == 0 )
> -  {
> -    add(this);
> -  }
> -}
> -
> -//////////////////////////////////////////////////////////////////////
> -//
> -// Inline implementation of the functions for DataObject<SerialAsync>
> -//
> -//////////////////////////////////////////////////////////////////////
> -
> -//
> -// DataObject::DataObject()
> -// Initialize:
> -//   released_m to the end of the queue (which should) also be the
> -//   beginning.  notifications_m to zero, since nothing has been
> -//   released yet.
> -//
> -
> -inline
> -DataObject<SerialAsync>::DataObject(int)
> -: released_m(queue_m.end()), notifications_m(0)
> -{
> -}
> -
> -//
> -// void DataObject::release(Action)
> -// An iterate has finished and is telling the DataObject that
> -// it is no longer needed.
> -//
> +/// void DataObject::releaseIterates(SerialAsync::Action)
> +/// When the last released iterate dies, we need to
> +/// look at the beginning of the queue and tell more iterates
> +/// that they can access this data.
> 
>  inline void
> -DataObject<SerialAsync>::release(SerialAsync::Action)
> -{
> -  if ( --notifications_m == 0 )
> -    releaseIterates();
> -}
> -
> -
> -
> -//-----------------------------------------------------------------------------
> -//
> -// void IterateScheduler<SerialAsync>::blockingEvaluate
> -// Evaluate all the iterates in the queue.
> -//
> -//-----------------------------------------------------------------------------
> -inline
> -void
> -IterateScheduler<SerialAsync>::blockingEvaluate()
> -{
> -  // Loop as long as there is anything in the queue.
> -  while (SystemContext::workReady())
> -  {
> -    SystemContext::runSomething();
> -  }
> -}
> -
> -//-----------------------------------------------------------------------------
> -//
> -// void DataObject::releaseIterates(SerialAsync::Action)
> -// When the last released iterate dies, we need to
> -// look at the beginning of the queue and tell more iterates
> -// that they can access this data.
> -//
> -//-----------------------------------------------------------------------------
> -inline
> -void
>  DataObject<SerialAsync>::releaseIterates()
>  {
>    // Get rid of the reservations that have finished.
> @@ -622,14 +616,17 @@
>        released_m->iterate().notify();
>        ++notifications_m;
> 
> -      // Record what action that one will take.
> +      // Record what action that one will take
> +      // and record its generation number
>        SerialAsync::Action act = released_m->act();
> +      int generation = released_m->iterate().generation();
> 
>        // Look at the next iterate.
>        ++released_m;
> 
>        // If the first one was a read, release more.
>        if ( act == SerialAsync::Read )
> +	{
> 
>          // As long as we aren't at the end and we have more reads...
>          while ((released_m != end) &&
> @@ -642,29 +639,30 @@
>              // And go on to the next.
>              ++released_m;
>            }
> +
> +	}
> +
>      }
>  }
> 
> +/// void DataObject::request(Iterate&, action)
> +/// An iterate makes a reservation with this DataObject for a given
> +/// action in a given generation.  The request may be granted
> +/// immediately.
> 
> -//
> -// void DataObject::request(Iterate&, action)
> -// An iterate makes a reservation with this DataObject for a given
> -// action in a given generation.  The request may be granted
> -// immediately.
> -//
> -inline
> -void
> +inline void
>  DataObject<SerialAsync>::request(Iterate<SerialAsync>& it,
>                                   SerialAsync::Action act)
> 
>  {
>    // The request can be granted immediately if:
>    // The queue is currently empty, or
> -  // The request is a read and everything in the queue is a read.
> +  // the request is a read and everything in the queue is a read,
> +  // or (with relaxed conditions), everything is the same generation.
> 
>    // Set notifications dynamically and automatically
>    //     every time a request is made by the iterate
> -  it.incr_notifications();
> +  it.notifications_m++;
> 
>    bool allReleased = (queue_m.end() == released_m);
>    bool releasable =  queue_m.empty() ||
> @@ -691,17 +689,11 @@
>  }
> 
> 
> -//----------------------------------------------------------------------
> -
> -
> -//
> -// End of Smarts namespace.
> -//
> -}
> +} // namespace Smarts
> 
>  //////////////////////////////////////////////////////////////////////
> 
> -#endif     // POOMA_PACKAGE_CLASS_H
> +#endif     // _SerialAsync_h_
> 
>  /***************************************************************************
>   * $RCSfile: SerialAsync.h,v $   $Author: sa_smith $
> --- /home/richard/src/pooma/cvs/r2/src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp	2000-04-12 02:08:06.000000000 +0200
> +++ Threads/IterateSchedulers/SerialAsync.cmpl.cpp	2004-01-02 00:40:16.000000000 +0100
> @@ -82,6 +82,12 @@
> 
>  std::list<RunnablePtr_t> SystemContext::workQueueMessages_m;
>  std::list<RunnablePtr_t> SystemContext::workQueue_m;
> +#if POOMA_MPI
> +  MPI_Request SystemContext::requests_m[1024];
> +  std::map<int, SystemContext::IteratePtr_t> SystemContext::allocated_requests_m;
> +  std::set<int> SystemContext::free_requests_m;
> +#endif
> +std::stack<int> IterateScheduler<SerialAsync>::generationStack_m;
> 
>  }
> 


-- 
Jeffrey D. Oldham
oldham at codesourcery.com




More information about the pooma-dev mailing list