[pooma-dev] [PATCH] MPI support for SerialAsync scheduler
Richard Guenther
rguenth at tat.physik.uni-tuebingen.de
Tue Jan 6 19:58:33 UTC 2004
On Tue, 6 Jan 2004, Jeffrey D. Oldham wrote:
> Let's move the magic constant into a const variable instead of having
> the constant scattered throughout the code. Then, please commit. Thanks.
For the record, this is what I committed. It passes builds for both
--serial and --mpi for me.
Richard.
2004Jan06 Richard Guenther <richard.guenther at uni-tuebingen.de>
* src/Threads/IterateSchedulers/SerialAsync.h: doxygenifize,
add std::stack<int> for generation tracking, add support for
asyncronous MPI requests.
src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp: define
new static variables.
src/Threads/IterateSchedulers/Runnable.h: declare add().
src/Pooma/Pooma.cmpl.cpp: use SystemContext::max_requests
constant.
Index: Pooma/Pooma.cmpl.cpp
===================================================================
RCS file: /home/pooma/Repository/r2/src/Pooma/Pooma.cmpl.cpp,v
retrieving revision 1.40
diff -u -u -r1.40 Pooma.cmpl.cpp
--- Pooma/Pooma.cmpl.cpp 5 Jan 2004 22:34:33 -0000 1.40
+++ Pooma/Pooma.cmpl.cpp 6 Jan 2004 19:52:47 -0000
@@ -354,8 +354,7 @@
#if POOMA_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &myContext_g);
MPI_Comm_size(MPI_COMM_WORLD, &numContexts_g);
- // ugh...
- for (int i=0; i<sizeof(Smarts::SystemContext::requests_m)/sizeof(MPI_Request); ++i)
+ for (int i=0; i<Smarts::SystemContext::max_requests; ++i)
Smarts::SystemContext::free_requests_m.insert(i);
#elif POOMA_CHEETAH
PAssert(controller_g != 0);
Index: Threads/IterateSchedulers/Runnable.h
===================================================================
RCS file: /home/pooma/Repository/r2/src/Threads/IterateSchedulers/Runnable.h,v
retrieving revision 1.4
diff -u -u -r1.4 Runnable.h
--- Threads/IterateSchedulers/Runnable.h 8 Jun 2000 22:16:50 -0000 1.4
+++ Threads/IterateSchedulers/Runnable.h 6 Jan 2004 19:52:47 -0000
@@ -125,5 +125,10 @@
typedef Runnable *RunnablePtr_t;
+/// Schedulers need to implement this function to add
+/// a runnable to the execution queue.
+
+inline void add(RunnablePtr_t);
+
}
#endif
Index: Threads/IterateSchedulers/SerialAsync.cmpl.cpp
===================================================================
RCS file: /home/pooma/Repository/r2/src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp,v
retrieving revision 1.3
diff -u -u -r1.3 SerialAsync.cmpl.cpp
--- Threads/IterateSchedulers/SerialAsync.cmpl.cpp 12 Apr 2000 00:08:06 -0000 1.3
+++ Threads/IterateSchedulers/SerialAsync.cmpl.cpp 6 Jan 2004 19:52:47 -0000
@@ -82,6 +82,13 @@
std::list<RunnablePtr_t> SystemContext::workQueueMessages_m;
std::list<RunnablePtr_t> SystemContext::workQueue_m;
+#if POOMA_MPI
+ const int SystemContext::max_requests;
+ MPI_Request SystemContext::requests_m[SystemContext::max_requests];
+ std::map<int, SystemContext::IteratePtr_t> SystemContext::allocated_requests_m;
+ std::set<int> SystemContext::free_requests_m;
+#endif
+std::stack<int> IterateScheduler<SerialAsync>::generationStack_m;
}
Index: Threads/IterateSchedulers/SerialAsync.h
===================================================================
RCS file: /home/pooma/Repository/r2/src/Threads/IterateSchedulers/SerialAsync.h,v
retrieving revision 1.9
diff -u -u -r1.9 SerialAsync.h
--- Threads/IterateSchedulers/SerialAsync.h 8 Jun 2000 22:16:50 -0000 1.9
+++ Threads/IterateSchedulers/SerialAsync.h 6 Jan 2004 19:52:48 -0000
@@ -42,48 +42,38 @@
// DataObject<SerialAsync>
//-----------------------------------------------------------------------------
-#include <iostream>
-
#ifndef _SerialAsync_h_
#define _SerialAsync_h_
-/*
-LIBRARY:
- SerialAsync
-
-CLASSES: IterateScheduler
-
-CLASSES: DataObject
-
-CLASSES: Iterate
-
-OVERVIEW
- SerialAsync IterateScheduler is a policy template to create a
- dependence graphs and executes the graph respecting the
- dependencies without using threads. There is no parallelism,
- but Iterates may be executed out-of-order with respect to the
- program text.
-
------------------------------------------------------------------------------*/
-
-//////////////////////////////////////////////////////////////////////
-//-----------------------------------------------------------------------------
-// Overview:
-// Smarts classes for times when you want no threads but you do want
-// dataflow evaluation.
-//-----------------------------------------------------------------------------
-
-//-----------------------------------------------------------------------------
-// Typedefs:
-//-----------------------------------------------------------------------------
+/** @file
+ * @ingroup IterateSchedulers
+ * @brief
+ * Smarts classes for times when you want no threads but you do want
+ * dataflow evaluation.
+ *
+ * SerialAsync IterateScheduler is a policy template to create a
+ * dependence graphs and executes the graph respecting the
+ * dependencies without using threads.
+ * There is no (thread level) parallelism, but Iterates may be executed
+ * out-of-order with respect to the program text. Also this scheduler is
+ * used for message based parallelism in which case asyncronous execution
+ * leads to reduced communication latencies.
+ */
//-----------------------------------------------------------------------------
// Includes:
//-----------------------------------------------------------------------------
#include <list>
+#include <vector>
+#include <map>
+#include <set>
+#include <functional>
+#include <stack>
#include "Threads/IterateSchedulers/IterateScheduler.h"
#include "Threads/IterateSchedulers/Runnable.h"
+#include "Tulip/Messaging.h"
+#include "Utilities/PAssert.h"
//-----------------------------------------------------------------------------
// Forward Declarations:
@@ -94,76 +84,258 @@
namespace Smarts {
-#define MYID 0
-#define MAX_CPUS 1
-//
-// Tag class for specializing IterateScheduler, Iterate and DataObject.
-//
+/**
+ * Tag class for specializing IterateScheduler, Iterate and DataObject.
+ */
+
struct SerialAsync
{
- enum Action { Read, Write};
+ enum Action { Read, Write };
};
-//-----------------------------------------------------------------------------
+/**
+ * Iterate<SerialAsync> is used to implement the SerialAsync
+ * scheduling policy.
+ *
+ * An Iterate is a non-blocking unit of concurrency that is used
+ * to describe a chunk of work. It inherits from the Runnable
+ * class and as all subclasses of Runnable, the user specializes
+ * the run() method to specify the operation.
+ * Iterate<SerialAsync> is a further specialization of the
+ * Iterate class to use the SerialAsync Scheduling algorithm to
+ * generate the data dependency graph for a data-driven
+ * execution.
+ */
+
+template<>
+class Iterate<SerialAsync> : public Runnable
+{
+ friend class IterateScheduler<SerialAsync>;
+ friend class DataObject<SerialAsync>;
+
+public:
+
+ typedef DataObject<SerialAsync> DataObject_t;
+ typedef IterateScheduler<SerialAsync> IterateScheduler_t;
+
+
+ /// The Constructor for this class takes the IterateScheduler and a
+ /// CPU affinity. CPU affinity has a default value of -1 which means
+ /// it may run on any CPU available.
+
+ inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1)
+ : scheduler_m(s), notifications_m(1), generation_m(-1), togo_m(1)
+ {}
+
+ /// The dtor is virtual because the subclasses will need to add to it.
+
+ virtual ~Iterate() {}
+
+ /// The run method does the core work of the Iterate.
+ /// It is supplied by the subclass.
+
+ virtual void run() = 0;
+
+ //@name Stubs for the affinities
+ /// There is no such thing in serial.
+ //@{
+
+ inline int affinity() const {return 0;}
+
+ inline int hintAffinity() const {return 0;}
+
+ inline void affinity(int) {}
+
+ inline void hintAffinity(int) {}
+
+ //@}
+
+ /// Notify is used to indicate to the Iterate that one of the data
+ /// objects it had requested has been granted. To do this, we dec a
+ /// dependence counter which, if equal to 0, the Iterate is ready for
+ /// execution.
+
+ void notify()
+ {
+ if (--notifications_m == 0)
+ add(this);
+ }
+
+ /// How many notifications remain?
+
+ int notifications() const { return notifications_m; }
+
+ void addNotification() { notifications_m++; }
+
+ int& generation() { return generation_m; }
+
+ int& togo() { return togo_m; }
+
+protected:
+
+ /// What scheduler are we working with?
+ IterateScheduler<SerialAsync> &scheduler_m;
+
+ /// How many notifications should we receive before we can run?
+ int notifications_m;
+
+ /// Which generation we were issued in.
+ int generation_m;
+
+ /// How many times we need to go past a "did something" to be ready
+ /// for destruction?
+ int togo_m;
+
+};
+
struct SystemContext
{
void addNCpus(int) {}
void wait() {}
void concurrency(int){}
- int concurrency() {return 1;}
+ int concurrency() { return 1; }
void mustRunOn() {}
// We have a separate message queue because they are
// higher priority.
+ typedef Iterate<SerialAsync> *IteratePtr_t;
static std::list<RunnablePtr_t> workQueueMessages_m;
static std::list<RunnablePtr_t> workQueue_m;
+#if POOMA_MPI
+ static const int max_requests = 1024;
+ static MPI_Request requests_m[max_requests];
+ static std::map<int, IteratePtr_t> allocated_requests_m;
+ static std::set<int> free_requests_m;
+#endif
+
+
+#if POOMA_MPI
+
+ /// Query, if we have lots of MPI_Request slots available
- ///////////////////////////
- // This function lets you check if there are iterates that are
- // ready to run.
- inline static
- bool workReady()
+ static bool haveLotsOfMPIRequests()
{
- return !(workQueue_m.empty() && workQueueMessages_m.empty());
+ return free_requests_m.size() > max_requests/2;
}
- ///////////////////////////
- // Run an iterate if one is ready.
- inline static
- void runSomething()
+ /// Get a MPI_Request slot, associated with an iterate
+
+ static MPI_Request* getMPIRequest(IteratePtr_t p)
{
- if (!workQueueMessages_m.empty())
- {
- // Get the top iterate.
- // Delete it from the queue.
- // Run the iterate.
- // Delete the iterate. This could put more iterates in the queue.
+ PInsist(!free_requests_m.empty(), "No free MPIRequest slots.");
+ int i = *free_requests_m.begin();
+ free_requests_m.erase(free_requests_m.begin());
+ allocated_requests_m[i] = p;
+ p->togo()++;
+ return &requests_m[i];
+ }
- RunnablePtr_t p = workQueueMessages_m.front();
- workQueueMessages_m.pop_front();
- p->execute();
+ static void releaseMPIRequest(int i)
+ {
+ IteratePtr_t p = allocated_requests_m[i];
+ allocated_requests_m.erase(i);
+ free_requests_m.insert(i);
+ if (--(p->togo()) == 0)
delete p;
- }
+ }
+
+ static bool waitForSomeRequests(bool mayBlock)
+ {
+ if (allocated_requests_m.empty())
+ return false;
+
+ int last_used_request = allocated_requests_m.rbegin()->first;
+ int finished[last_used_request+1];
+ MPI_Status statuses[last_used_request+1];
+ int nr_finished;
+ int res;
+ if (mayBlock)
+ res = MPI_Waitsome(last_used_request+1, requests_m,
+ &nr_finished, finished, statuses);
else
- {
- if (!workQueue_m.empty())
- {
- RunnablePtr_t p = workQueue_m.front();
- workQueue_m.pop_front();
- p->execute();
- delete p;
+ res = MPI_Testsome(last_used_request+1, requests_m,
+ &nr_finished, finished, statuses);
+ PAssert(res == MPI_SUCCESS || res == MPI_ERR_IN_STATUS);
+ if (nr_finished == MPI_UNDEFINED)
+ return false;
+
+ // release finised requests
+ while (nr_finished--) {
+ if (res == MPI_ERR_IN_STATUS) {
+ if (statuses[nr_finished].MPI_ERROR != MPI_SUCCESS) {
+ char msg[MPI_MAX_ERROR_STRING+1];
+ int len;
+ MPI_Error_string(statuses[nr_finished].MPI_ERROR, msg, &len);
+ msg[len] = '\0';
+ PInsist(0, msg);
+ }
}
+ releaseMPIRequest(finished[nr_finished]);
}
+ return true;
+ }
+
+#else
+
+ static bool waitForSomeRequests(bool mayBlock)
+ {
+ return false;
+ }
+
+#endif
+
+
+ /// This function lets you check if there are iterates that are
+ /// ready to run.
+
+ static bool workReady()
+ {
+ return !(workQueue_m.empty()
+ && workQueueMessages_m.empty()
+#if POOMA_MPI
+ && allocated_requests_m.empty()
+#endif
+ );
+ }
+
+ /// Run an iterate if one is ready. Returns if progress
+ /// was made.
+
+ static bool runSomething(bool mayBlock = true)
+ {
+ // do work in this order to minimize communication latency:
+ // - issue all messages
+ // - do some regular work
+ // - wait for messages to complete
+
+ RunnablePtr_t p = NULL;
+ if (!workQueueMessages_m.empty()) {
+ p = workQueueMessages_m.front();
+ workQueueMessages_m.pop_front();
+ } else if (!workQueue_m.empty()) {
+ p = workQueue_m.front();
+ workQueue_m.pop_front();
+ }
+
+ if (p) {
+ p->execute();
+ Iterate<SerialAsync> *it = dynamic_cast<IteratePtr_t>(p);
+ if (it) {
+ if (--(it->togo()) == 0)
+ delete it;
+ } else
+ delete p;
+ return true;
+
+ } else
+ return waitForSomeRequests(mayBlock);
}
};
-inline void addRunnable(RunnablePtr_t rn)
-{
- SystemContext::workQueue_m.push_front(rn);
-}
+/// Adds a runnable to the appropriate work-queue.
inline void add(RunnablePtr_t rn)
{
@@ -182,25 +354,18 @@
inline void wait() {}
inline void mustRunOn(){}
-/*------------------------------------------------------------------------
-CLASS
- IterateScheduler_Serial_Async
-
- Implements a asynchronous scheduler for a data driven execution.
- Specializes a IterateScheduler.
-
-KEYWORDS
- Data-parallelism, Native-interface, IterateScheduler.
-
-DESCRIPTION
-
- The SerialAsync IterateScheduler, Iterate and DataObject
- implement a SMARTS scheduler that does dataflow without threads.
- What that means is that when you hand iterates to the
- IterateScheduler it stores them up until you call
- IterateScheduler::blockingEvaluate(), at which point it evaluates
- iterates until the queue is empty.
------------------------------------------------------------------------------*/
+
+/**
+ * Implements a asynchronous scheduler for a data driven execution.
+ * Specializes a IterateScheduler.
+ *
+ * The SerialAsync IterateScheduler, Iterate and DataObject
+ * implement a SMARTS scheduler that does dataflow without threads.
+ * What that means is that when you hand iterates to the
+ * IterateScheduler it stores them up until you call
+ * IterateScheduler::blockingEvaluate(), at which point it evaluates
+ * iterates until the queue is empty.
+ */
template<>
class IterateScheduler<SerialAsync>
@@ -212,196 +377,128 @@
typedef DataObject<SerialAsync> DataObject_t;
typedef Iterate<SerialAsync> Iterate_t;
- ///////////////////////////
- // Constructor
- //
- IterateScheduler() {}
-
- ///////////////////////////
- // Destructor
- //
- ~IterateScheduler() {}
- void setConcurrency(int) {}
-
- //---------------------------------------------------------------------------
- // Mutators.
- //---------------------------------------------------------------------------
-
- ///////////////////////////
- // Tells the scheduler that the parser thread is starting a new
- // data-parallel statement. Any Iterate that is handed off to the
- // scheduler between beginGeneration() and endGeneration() belongs
- // to the same data-paralllel statement and therefore has the same
- // generation number.
- //
- inline void beginGeneration() { }
-
- ///////////////////////////
- // Tells the scheduler that no more Iterates will be handed off for
- // the data parallel statement that was begun with a
- // beginGeneration().
- //
- inline void endGeneration() {}
-
- ///////////////////////////
- // The parser thread calls this method to evaluate the generated
- // graph until all the nodes in the dependence graph has been
- // executed by the scheduler. That is to say, the scheduler
- // executes all the Iterates that has been handed off to it by the
- // parser thread.
- //
- inline
- void blockingEvaluate();
-
- ///////////////////////////
- // The parser thread calls this method to ask the scheduler to run
- // the given Iterate when the dependence on that Iterate has been
- // satisfied.
- //
- inline void handOff(Iterate<SerialAsync>* it);
+ IterateScheduler()
+ : generation_m(0)
+ {}
- inline
- void releaseIterates() { }
+ ~IterateScheduler() {}
-protected:
-private:
+ void setConcurrency(int) {}
- typedef std::list<Iterate_t*> Container_t;
- typedef Container_t::iterator Iterator_t;
+ /// Tells the scheduler that the parser thread is starting a new
+ /// data-parallel statement. Any Iterate that is handed off to the
+ /// scheduler between beginGeneration() and endGeneration() belongs
+ /// to the same data-paralllel statement and therefore has the same
+ /// generation number.
+ /// Nested invocations are handled as being part of the outermost
+ /// generation.
-};
+ void beginGeneration()
+ {
+ // Ensure proper overflow behavior.
+ if (++generation_m < 0)
+ generation_m = 0;
+ generationStack_m.push(generation_m);
+ }
-//-----------------------------------------------------------------------------
+ /// Tells the scheduler that no more Iterates will be handed off for
+ /// the data parallel statement that was begun with a
+ /// beginGeneration().
-/*------------------------------------------------------------------------
-CLASS
- Iterate_SerialAsync
-
- Iterate<SerialAsync> is used to implement the SerialAsync
- scheduling policy.
-
-KEYWORDS
- Data_Parallelism, Native_Interface, IterateScheduler, Data_Flow.
-
-DESCRIPTION
- An Iterate is a non-blocking unit of concurrency that is used
- to describe a chunk of work. It inherits from the Runnable
- class and as all subclasses of Runnable, the user specializes
- the run() method to specify the operation.
- Iterate<SerialAsync> is a further specialization of the
- Iterate class to use the SerialAsync Scheduling algorithm to
- generate the data dependency graph for a data-driven
- execution. */
+ void endGeneration()
+ {
+ PAssert(inGeneration());
+ generationStack_m.pop();
-template<>
-class Iterate<SerialAsync> : public Runnable
-{
- friend class IterateScheduler<SerialAsync>;
- friend class DataObject<SerialAsync>;
+#if POOMA_MPI
+ // this is a safe point to block until we have "lots" of MPI Requests
+ if (!inGeneration())
+ while (!SystemContext::haveLotsOfMPIRequests())
+ SystemContext::runSomething(true);
+#endif
+ }
-public:
+ /// Wether we are inside a generation and may not safely block.
- typedef DataObject<SerialAsync> DataObject_t;
- typedef IterateScheduler<SerialAsync> IterateScheduler_t;
+ bool inGeneration() const
+ {
+ return !generationStack_m.empty();
+ }
+ /// What the current generation is.
- ///////////////////////////
- // The Constructor for this class takes the IterateScheduler and a
- // CPU affinity. CPU affinity has a default value of -1 which means
- // it may run on any CPU available.
- //
- inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1);
-
- ///////////////////////////
- // The dtor is virtual because the subclasses will need to add to it.
- //
- virtual ~Iterate() {}
+ int generation() const
+ {
+ if (!inGeneration())
+ return -1;
+ return generationStack_m.top();
+ }
- ///////////////////////////
- // The run method does the core work of the Iterate.
- // It is supplied by the subclass.
- //
- virtual void run() = 0;
+ /// The parser thread calls this method to evaluate the generated
+ /// graph until all the nodes in the dependence graph has been
+ /// executed by the scheduler. That is to say, the scheduler
+ /// executes all the Iterates that has been handed off to it by the
+ /// parser thread.
- ///////////////////////////
- // Stub in all the affinities, because there is no such thing
- // in serial.
- //
- inline int affinity() const {return 0;}
- ///////////////////////////
- // Stub in all the affinities, because there is no such thing
- // in serial.
- //
- inline int hintAffinity() const {return 0;}
- ///////////////////////////
- // Stub in all the affinities, because there is no such thing
- // in serial.
- //
- inline void affinity(int) {}
- ///////////////////////////
- // Stub in all the affinities, because there is no such thing
- // in serial.
- //
- inline void hintAffinity(int) {}
+ void blockingEvaluate()
+ {
+ if (inGeneration()) {
+ // It's not safe to block inside a generation, so
+ // just do as much as we can without blocking.
+ while (SystemContext::runSomething(false))
+ ;
+
+ } else {
+ // Loop as long as there is anything in the queue.
+ while (SystemContext::workReady())
+ SystemContext::runSomething(true);
+ }
+ }
- ///////////////////////////
- // Notify is used to indicate to the Iterate that one of the data
- // objects it had requested has been granted. To do this, we dec a
- // dependence counter which, if equal to 0, the Iterate is ready for
- // execution.
- //
- inline void notify();
-
- ///////////////////////////
- // How many notifications remain?
- //
- inline
- int notifications() const { return notifications_m; }
+ /// The parser thread calls this method to ask the scheduler to run
+ /// the given Iterate when the dependence on that Iterate has been
+ /// satisfied.
- inline void addNotification()
+ void handOff(Iterate<SerialAsync>* it)
{
- notifications_m++;
+ // No action needs to be taken here. Iterates will make their
+ // own way into the execution queue.
+ it->generation() = generation();
+ it->notify();
}
-protected:
+ void releaseIterates() { }
- // What scheduler are we working with?
- IterateScheduler<SerialAsync> &scheduler_m;
+private:
- // How many notifications should we receive before we can run?
- int notifications_m;
+ typedef std::list<Iterate_t*> Container_t;
+ typedef Container_t::iterator Iterator_t;
-private:
- // Set notifications dynamically and automatically every time a
- // request is made by the iterate
- void incr_notifications() { notifications_m++;}
+ static std::stack<int> generationStack_m;
+ int generation_m;
};
-//-----------------------------------------------------------------------------
-
-/*------------------------------------------------------------------------
-CLASS
- DataObject_SerialAsync
-
- Implements a asynchronous scheduler for a data driven execution.
-KEYWORDS
- Data-parallelism, Native-interface, IterateScheduler.
-
-DESCRIPTION
- The DataObject Class is used introduce a type to represent
- a resources (normally) blocks of data) that Iterates contend
- for atomic access. Iterates make request for either a read or
- write to the DataObjects. DataObjects may grant the request if
- the object is currently available. Otherwise, the request is
- enqueue in a queue private to the data object until the
- DataObject is release by another Iterate. A set of read
- requests may be granted all at once if there are no
- intervening write request to that DataObject.
- DataObject<SerialAsync> is a specialization of DataObject for
- the policy template SerialAsync.
-*/
+/**
+ * Implements a asynchronous scheduler for a data driven execution.
+ *
+ * The DataObject Class is used introduce a type to represent
+ * a resources (normally) blocks of data) that Iterates contend
+ * for atomic access. Iterates make request for either a read or
+ * write to the DataObjects. DataObjects may grant the request if
+ * the object is currently available. Otherwise, the request is
+ * enqueue in a queue private to the data object until the
+ * DataObject is release by another Iterate. A set of read
+ * requests may be granted all at once if there are no
+ * intervening write request to that DataObject.
+ * DataObject<SerialAsync> is a specialization of DataObject for
+ * the policy template SerialAsync.
+ *
+ * There are two ways data can be used: to read or to write.
+ * Don't change this to give more than two states;
+ * things inside depend on that.
+ */
template<>
class DataObject<SerialAsync>
@@ -413,54 +510,56 @@
typedef IterateScheduler<SerialAsync> IterateScheduler_t;
typedef Iterate<SerialAsync> Iterate_t;
- // There are two ways data can be used: to read or to write.
- // Don't change this to give more than two states:
- // things inside depend on that.
-
- ///////////////////////////
- // Construct the data object with an empty set of requests
- // and the given affinity.
- //
- inline DataObject(int affinity=-1);
+
+ /// Construct the data object with an empty set of requests
+ /// and the given affinity.
+
+ DataObject(int affinity=-1)
+ : released_m(queue_m.end()), notifications_m(0)
+ {
+ // released_m to the end of the queue (which should) also be the
+ // beginning. notifications_m to zero, since nothing has been
+ // released yet.
+ }
- ///////////////////////////
- // for compatibility with other SMARTS schedulers, accept
- // Scheduler arguments (unused)
- //
- inline
- DataObject(int affinity, IterateScheduler<SerialAsync>&);
-
- ///////////////////////////
- // Stub out affinity because there is no affinity in serial.
- //
- inline int affinity() const { return 0; }
-
- ///////////////////////////
- // Stub out affinity because there is no affinity in serial.
- //
- inline void affinity(int) {}
+ /// for compatibility with other SMARTS schedulers, accept
+ /// Scheduler arguments (unused)
- ///////////////////////////
- // An iterate makes a request for a certain action in a certain
- // generation.
- //
- inline
- void request(Iterate<SerialAsync>&, SerialAsync::Action);
-
- ///////////////////////////
- // An iterate finishes and tells the DataObject it no longer needs
- // it. If this is the last release for the current set of
- // requests, have the IterateScheduler release some more.
- //
- inline void release(SerialAsync::Action);
+ inline DataObject(int affinity, IterateScheduler<SerialAsync>&)
+ : released_m(queue_m.end()), notifications_m(0)
+ {}
+
+ /// Stub out affinity because there is no affinity in serial.
+
+ int affinity() const { return 0; }
+
+ /// Stub out affinity because there is no affinity in serial.
+
+ void affinity(int) {}
+
+ /// An iterate makes a request for a certain action in a certain
+ /// generation.
+
+ inline void request(Iterate<SerialAsync>&, SerialAsync::Action);
+
+ /// An iterate finishes and tells the DataObject it no longer needs
+ /// it. If this is the last release for the current set of
+ /// requests, have the IterateScheduler release some more.
+
+ void release(SerialAsync::Action)
+ {
+ if (--notifications_m == 0)
+ releaseIterates();
+ }
-protected:
private:
- // If release needs to let more iterates go, it calls this.
+ /// If release needs to let more iterates go, it calls this.
inline void releaseIterates();
- // The type for a request.
+ /**
+ * The type for a request.
+ */
class Request
{
public:
@@ -475,135 +574,27 @@
SerialAsync::Action act_m;
};
- // The type of the queue and iterator.
+ /// The type of the queue and iterator.
typedef std::list<Request> Container_t;
typedef Container_t::iterator Iterator_t;
- // The list of requests from various iterates.
- // They're granted in FIFO order.
+ /// The list of requests from various iterates.
+ /// They're granted in FIFO order.
Container_t queue_m;
- // Pointer to the last request that has been granted.
+ /// Pointer to the last request that has been granted.
Iterator_t released_m;
- // The number of outstanding notifications.
+ /// The number of outstanding notifications.
int notifications_m;
};
-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for
-// IterateScheduler<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>*)
-// No action needs to be taken here. Iterates will make their
-// own way into the execution queue.
-//
+/// void DataObject::releaseIterates(SerialAsync::Action)
+/// When the last released iterate dies, we need to
+/// look at the beginning of the queue and tell more iterates
+/// that they can access this data.
inline void
-IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>* it)
-{
- it->notify();
-}
-
-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for Iterate<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// Iterate<SerialAsync>::Iterate
-// Construct with the scheduler and the number of notifications.
-// Ignore the affinity.
-//
-
-inline
-Iterate<SerialAsync>::Iterate(IterateScheduler<SerialAsync>& s, int)
-: scheduler_m(s), notifications_m(1)
-{
-}
-
-//
-// Iterate<SerialAsync>::notify
-// Notify the iterate that a DataObject is ready.
-// Decrement the counter, and if it is zero, alert the scheduler.
-//
-
-inline void
-Iterate<SerialAsync>::notify()
-{
- if ( --notifications_m == 0 )
- {
- add(this);
- }
-}
-
-//////////////////////////////////////////////////////////////////////
-//
-// Inline implementation of the functions for DataObject<SerialAsync>
-//
-//////////////////////////////////////////////////////////////////////
-
-//
-// DataObject::DataObject()
-// Initialize:
-// released_m to the end of the queue (which should) also be the
-// beginning. notifications_m to zero, since nothing has been
-// released yet.
-//
-
-inline
-DataObject<SerialAsync>::DataObject(int)
-: released_m(queue_m.end()), notifications_m(0)
-{
-}
-
-//
-// void DataObject::release(Action)
-// An iterate has finished and is telling the DataObject that
-// it is no longer needed.
-//
-
-inline void
-DataObject<SerialAsync>::release(SerialAsync::Action)
-{
- if ( --notifications_m == 0 )
- releaseIterates();
-}
-
-
-
-//-----------------------------------------------------------------------------
-//
-// void IterateScheduler<SerialAsync>::blockingEvaluate
-// Evaluate all the iterates in the queue.
-//
-//-----------------------------------------------------------------------------
-inline
-void
-IterateScheduler<SerialAsync>::blockingEvaluate()
-{
- // Loop as long as there is anything in the queue.
- while (SystemContext::workReady())
- {
- SystemContext::runSomething();
- }
-}
-
-//-----------------------------------------------------------------------------
-//
-// void DataObject::releaseIterates(SerialAsync::Action)
-// When the last released iterate dies, we need to
-// look at the beginning of the queue and tell more iterates
-// that they can access this data.
-//
-//-----------------------------------------------------------------------------
-inline
-void
DataObject<SerialAsync>::releaseIterates()
{
// Get rid of the reservations that have finished.
@@ -622,14 +613,17 @@
released_m->iterate().notify();
++notifications_m;
- // Record what action that one will take.
+ // Record what action that one will take
+ // and record its generation number
SerialAsync::Action act = released_m->act();
+ int generation = released_m->iterate().generation();
// Look at the next iterate.
++released_m;
// If the first one was a read, release more.
if ( act == SerialAsync::Read )
+ {
// As long as we aren't at the end and we have more reads...
while ((released_m != end) &&
@@ -642,29 +636,30 @@
// And go on to the next.
++released_m;
}
+
+ }
+
}
}
+/// void DataObject::request(Iterate&, action)
+/// An iterate makes a reservation with this DataObject for a given
+/// action in a given generation. The request may be granted
+/// immediately.
-//
-// void DataObject::request(Iterate&, action)
-// An iterate makes a reservation with this DataObject for a given
-// action in a given generation. The request may be granted
-// immediately.
-//
-inline
-void
+inline void
DataObject<SerialAsync>::request(Iterate<SerialAsync>& it,
SerialAsync::Action act)
{
// The request can be granted immediately if:
// The queue is currently empty, or
- // The request is a read and everything in the queue is a read.
+ // the request is a read and everything in the queue is a read,
+ // or (with relaxed conditions), everything is the same generation.
// Set notifications dynamically and automatically
// every time a request is made by the iterate
- it.incr_notifications();
+ it.notifications_m++;
bool allReleased = (queue_m.end() == released_m);
bool releasable = queue_m.empty() ||
@@ -691,17 +686,11 @@
}
-//----------------------------------------------------------------------
-
-
-//
-// End of Smarts namespace.
-//
-}
+} // namespace Smarts
//////////////////////////////////////////////////////////////////////
-#endif // POOMA_PACKAGE_CLASS_H
+#endif // _SerialAsync_h_
/***************************************************************************
* $RCSfile: SerialAsync.h,v $ $Author: sa_smith $
More information about the pooma-dev
mailing list