[PATCH] MPI support for SerialAsync scheduler
Jeffrey D. Oldham
oldham at codesourcery.com
Mon Jan 5 21:30:43 UTC 2004
Richard Guenther wrote:
> The patch was tested as usual.
>
> Ok to commit?
I have some questions and comments interspersed below.
> Thanks, Richard.
>
>
> 2004Jan02 Richard Guenther <richard.guenther at uni-tuebingen.de>
>
> * src/Threads/IterateSchedulers/SerialAsync.h: doxygenifize,
> add std::stack<int> for generation tracking, add support for
> asyncronous MPI requests.
Add an 'h' to spell asynchronous.
> src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp: define
> new static variables.
>
> --- /home/richard/src/pooma/cvs/r2/src/Threads/IterateSchedulers/SerialAsync.h 2000-06-09 00:16:50.000000000 +0200
> +++ Threads/IterateSchedulers/SerialAsync.h 2004-01-02 00:40:16.000000000 +0100
> @@ -42,48 +42,38 @@
> // DataObject<SerialAsync>
> //-----------------------------------------------------------------------------
>
> -#include <iostream>
> -
> #ifndef _SerialAsync_h_
> #define _SerialAsync_h_
> -/*
> -LIBRARY:
> - SerialAsync
> -
> -CLASSES: IterateScheduler
> -
> -CLASSES: DataObject
> -
> -CLASSES: Iterate
> -
> -OVERVIEW
> - SerialAsync IterateScheduler is a policy template to create a
> - dependence graphs and executes the graph respecting the
> - dependencies without using threads. There is no parallelism,
> - but Iterates may be executed out-of-order with respect to the
> - program text.
> -
> ------------------------------------------------------------------------------*/
> -
> -//////////////////////////////////////////////////////////////////////
>
> -//-----------------------------------------------------------------------------
> -// Overview:
> -// Smarts classes for times when you want no threads but you do want
> -// dataflow evaluation.
> -//-----------------------------------------------------------------------------
> -
> -//-----------------------------------------------------------------------------
> -// Typedefs:
> -//-----------------------------------------------------------------------------
> +/** @file
> + * @ingroup IterateSchedulers
> + * @brief
> + * Smarts classes for times when you want no threads but you do want
> + * dataflow evaluation.
> + *
> + * SerialAsync IterateScheduler is a policy template to create a
> + * dependence graphs and executes the graph respecting the
> + * dependencies without using threads.
> + * There is no (thread level) parallelism, but Iterates may be executed
> + * out-of-order with respect to the program text. Also this scheduler is
> + * used for message based parallelism in which case asyncronous execution
> + * leads to reduced communication latencies.
> + */
>
> //-----------------------------------------------------------------------------
> // Includes:
> //-----------------------------------------------------------------------------
>
> #include <list>
> +#include <vector>
> +#include <map>
> +#include <set>
> +#include <functional>
> +#include <stack>
> #include "Threads/IterateSchedulers/IterateScheduler.h"
> #include "Threads/IterateSchedulers/Runnable.h"
> +#include "Tulip/Messaging.h"
> +#include "Utilities/PAssert.h"
>
> //-----------------------------------------------------------------------------
> // Forward Declarations:
> @@ -94,76 +84,261 @@
>
> namespace Smarts {
>
> -#define MYID 0
> -#define MAX_CPUS 1
> -//
> -// Tag class for specializing IterateScheduler, Iterate and DataObject.
> -//
> +/**
> + * Tag class for specializing IterateScheduler, Iterate and DataObject.
> + */
> +
> struct SerialAsync
> {
> - enum Action { Read, Write};
> + enum Action { Read, Write };
> };
>
>
> -//-----------------------------------------------------------------------------
> +/**
> + * Iterate<SerialAsync> is used to implement the SerialAsync
> + * scheduling policy.
> + *
> + * An Iterate is a non-blocking unit of concurrency that is used
> + * to describe a chunk of work. It inherits from the Runnable
> + * class and as all subclasses of Runnable, the user specializes
> + * the run() method to specify the operation.
> + * Iterate<SerialAsync> is a further specialization of the
> + * Iterate class to use the SerialAsync Scheduling algorithm to
> + * generate the data dependency graph for a data-driven
> + * execution.
> + */
> +
> +template<>
> +class Iterate<SerialAsync> : public Runnable
> +{
> + friend class IterateScheduler<SerialAsync>;
> + friend class DataObject<SerialAsync>;
> +
> +public:
> +
> + typedef DataObject<SerialAsync> DataObject_t;
> + typedef IterateScheduler<SerialAsync> IterateScheduler_t;
> +
> +
> + /// The Constructor for this class takes the IterateScheduler and a
> + /// CPU affinity. CPU affinity has a default value of -1 which means
> + /// it may run on any CPU available.
> +
> + inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1)
> + : scheduler_m(s), notifications_m(1), generation_m(-1), togo_m(1)
> + {}
> +
> + /// The dtor is virtual because the subclasses will need to add to it.
> +
> + virtual ~Iterate() {}
> +
> + /// The run method does the core work of the Iterate.
> + /// It is supplied by the subclass.
> +
> + virtual void run() = 0;
> +
> + //@name Stubs for the affinities
> + /// There is no such thing in serial.
> + //@{
> +
> + inline int affinity() const {return 0;}
> +
> + inline int hintAffinity() const {return 0;}
> +
> + inline void affinity(int) {}
> +
> + inline void hintAffinity(int) {}
> +
> + //@}
> +
> + /// Notify is used to indicate to the Iterate that one of the data
> + /// objects it had requested has been granted. To do this, we dec a
> + /// dependence counter which, if equal to 0, the Iterate is ready for
> + /// execution.
> +
> + void notify()
> + {
> + if (--notifications_m == 0)
> + add(this);
> + }
> +
> + /// How many notifications remain?
> +
> + int notifications() const { return notifications_m; }
> +
> + void addNotification() { notifications_m++; }
> +
> + int& generation() { return generation_m; }
> +
> + int& togo() { return togo_m; }
> +
> +protected:
> +
> + /// What scheduler are we working with?
> + IterateScheduler<SerialAsync> &scheduler_m;
> +
> + /// How many notifications should we receive before we can run?
> + int notifications_m;
> +
> + /// Which generation we were issued in.
> + int generation_m;
> +
> + /// How many times we need to go past a "did something" to be ready
> + /// for destruction?
> + int togo_m;
> +
> +};
> +
> +
> +/**
> + * FIXME.
> + */
I am wary of adding unfinished code to the code base. At the very
least, we need a more extensive comment describing what is not finished.
> struct SystemContext
> {
> void addNCpus(int) {}
> void wait() {}
> void concurrency(int){}
> - int concurrency() {return 1;}
> + int concurrency() { return 1; }
> void mustRunOn() {}
>
> // We have a separate message queue because they are
> // higher priority.
> + typedef Iterate<SerialAsync> *IteratePtr_t;
> static std::list<RunnablePtr_t> workQueueMessages_m;
> static std::list<RunnablePtr_t> workQueue_m;
> +#if POOMA_MPI
> + static MPI_Request requests_m[1024];
What is this fixed constant of 1024? Does this come from the MPI standard?
> + static std::map<int, IteratePtr_t> allocated_requests_m;
> + static std::set<int> free_requests_m;
> +#endif
> +
> +
> +#if POOMA_MPI
>
> - ///////////////////////////
> - // This function lets you check if there are iterates that are
> - // ready to run.
> - inline static
> - bool workReady()
> + /// Query, if we have lots of MPI_Request slots available
> +
> + static bool haveLotsOfMPIRequests()
> {
> - return !(workQueue_m.empty() && workQueueMessages_m.empty());
> + return free_requests_m.size() > 1024/2;
> }
>
> - ///////////////////////////
> - // Run an iterate if one is ready.
> - inline static
> - void runSomething()
> + /// Get a MPI_Request slot, associated with an iterate
> +
> + static MPI_Request* getMPIRequest(IteratePtr_t p)
> {
> - if (!workQueueMessages_m.empty())
> - {
> - // Get the top iterate.
> - // Delete it from the queue.
> - // Run the iterate.
> - // Delete the iterate. This could put more iterates in the queue.
> + PInsist(!free_requests_m.empty(), "No free MPIRequest slots.");
> + int i = *free_requests_m.begin();
> + free_requests_m.erase(free_requests_m.begin());
> + allocated_requests_m[i] = p;
> + p->togo()++;
> + return &requests_m[i];
> + }
>
> - RunnablePtr_t p = workQueueMessages_m.front();
> - workQueueMessages_m.pop_front();
> - p->execute();
> + static void releaseMPIRequest(int i)
> + {
> + IteratePtr_t p = allocated_requests_m[i];
> + allocated_requests_m.erase(i);
> + free_requests_m.insert(i);
> + if (--(p->togo()) == 0)
> delete p;
> - }
> + }
> +
> + static bool waitForSomeRequests(bool mayBlock)
> + {
> + if (allocated_requests_m.empty())
> + return false;
> +
> + int last_used_request = allocated_requests_m.rbegin()->first;
> + int finished[last_used_request+1];
> + MPI_Status statuses[last_used_request+1];
> + int nr_finished;
> + int res;
> + if (mayBlock)
> + res = MPI_Waitsome(last_used_request+1, requests_m,
> + &nr_finished, finished, statuses);
> else
> - {
> - if (!workQueue_m.empty())
> - {
> - RunnablePtr_t p = workQueue_m.front();
> - workQueue_m.pop_front();
> - p->execute();
> - delete p;
> + res = MPI_Testsome(last_used_request+1, requests_m,
> + &nr_finished, finished, statuses);
> + PAssert(res == MPI_SUCCESS || res == MPI_ERR_IN_STATUS);
> + if (nr_finished == MPI_UNDEFINED)
> + return false;
> +
> + // release finised requests
> + while (nr_finished--) {
> + if (res == MPI_ERR_IN_STATUS) {
> + if (statuses[nr_finished].MPI_ERROR != MPI_SUCCESS) {
> + char msg[MPI_MAX_ERROR_STRING+1];
> + int len;
> + MPI_Error_string(statuses[nr_finished].MPI_ERROR, msg, &len);
> + msg[len] = '\0';
> + PInsist(0, msg);
> + }
> }
> + releaseMPIRequest(finished[nr_finished]);
> }
> + return true;
> + }
> +
> +#else
> +
> + static bool waitForSomeRequests(bool mayBlock)
> + {
> + return false;
> + }
> +
> +#endif
> +
> +
> + /// This function lets you check if there are iterates that are
> + /// ready to run.
> +
> + static bool workReady()
> + {
> + return !(workQueue_m.empty()
> + && workQueueMessages_m.empty()
> +#if POOMA_MPI
> + && allocated_requests_m.empty()
> +#endif
> + );
> + }
> +
> + /// Run an iterate if one is ready. Returns if progress
> + /// was made.
> +
> + static bool runSomething(bool mayBlock = true)
> + {
> + // do work in this order to minimize communication latency:
> + // - issue all messages
> + // - do some regular work
> + // - wait for messages to complete
> +
> + RunnablePtr_t p = NULL;
> + if (!workQueueMessages_m.empty()) {
> + p = workQueueMessages_m.front();
> + workQueueMessages_m.pop_front();
> + } else if (!workQueue_m.empty()) {
> + p = workQueue_m.front();
> + workQueue_m.pop_front();
> + }
> +
> + if (p) {
> + p->execute();
> + Iterate<SerialAsync> *it = dynamic_cast<IteratePtr_t>(p);
> + if (it) {
> + if (--(it->togo()) == 0)
> + delete it;
> + } else
> + delete p;
> + return true;
> +
> + } else
> + return waitForSomeRequests(mayBlock);
> }
>
> };
>
> -inline void addRunnable(RunnablePtr_t rn)
> -{
> - SystemContext::workQueue_m.push_front(rn);
> -}
> +/// Adds a runnable to the appropriate work-queue.
>
> inline void add(RunnablePtr_t rn)
> {
> @@ -182,25 +357,18 @@
> inline void wait() {}
> inline void mustRunOn(){}
>
> -/*------------------------------------------------------------------------
> -CLASS
> - IterateScheduler_Serial_Async
> -
> - Implements a asynchronous scheduler for a data driven execution.
> - Specializes a IterateScheduler.
> -
> -KEYWORDS
> - Data-parallelism, Native-interface, IterateScheduler.
> -
> -DESCRIPTION
> -
> - The SerialAsync IterateScheduler, Iterate and DataObject
> - implement a SMARTS scheduler that does dataflow without threads.
> - What that means is that when you hand iterates to the
> - IterateScheduler it stores them up until you call
> - IterateScheduler::blockingEvaluate(), at which point it evaluates
> - iterates until the queue is empty.
> ------------------------------------------------------------------------------*/
> +
> +/**
> + * Implements a asynchronous scheduler for a data driven execution.
> + * Specializes a IterateScheduler.
> + *
> + * The SerialAsync IterateScheduler, Iterate and DataObject
> + * implement a SMARTS scheduler that does dataflow without threads.
> + * What that means is that when you hand iterates to the
> + * IterateScheduler it stores them up until you call
> + * IterateScheduler::blockingEvaluate(), at which point it evaluates
> + * iterates until the queue is empty.
> + */
>
> template<>
> class IterateScheduler<SerialAsync>
> @@ -212,196 +380,128 @@
> typedef DataObject<SerialAsync> DataObject_t;
> typedef Iterate<SerialAsync> Iterate_t;
>
> - ///////////////////////////
> - // Constructor
> - //
> - IterateScheduler() {}
> -
> - ///////////////////////////
> - // Destructor
> - //
> - ~IterateScheduler() {}
> - void setConcurrency(int) {}
> -
> - //---------------------------------------------------------------------------
> - // Mutators.
> - //---------------------------------------------------------------------------
> -
> - ///////////////////////////
> - // Tells the scheduler that the parser thread is starting a new
> - // data-parallel statement. Any Iterate that is handed off to the
> - // scheduler between beginGeneration() and endGeneration() belongs
> - // to the same data-paralllel statement and therefore has the same
> - // generation number.
> - //
> - inline void beginGeneration() { }
> -
> - ///////////////////////////
> - // Tells the scheduler that no more Iterates will be handed off for
> - // the data parallel statement that was begun with a
> - // beginGeneration().
> - //
> - inline void endGeneration() {}
> -
> - ///////////////////////////
> - // The parser thread calls this method to evaluate the generated
> - // graph until all the nodes in the dependence graph has been
> - // executed by the scheduler. That is to say, the scheduler
> - // executes all the Iterates that has been handed off to it by the
> - // parser thread.
> - //
> - inline
> - void blockingEvaluate();
> -
> - ///////////////////////////
> - // The parser thread calls this method to ask the scheduler to run
> - // the given Iterate when the dependence on that Iterate has been
> - // satisfied.
> - //
> - inline void handOff(Iterate<SerialAsync>* it);
> + IterateScheduler()
> + : generation_m(0)
> + {}
>
> - inline
> - void releaseIterates() { }
> + ~IterateScheduler() {}
>
> -protected:
> -private:
> + void setConcurrency(int) {}
>
> - typedef std::list<Iterate_t*> Container_t;
> - typedef Container_t::iterator Iterator_t;
> + /// Tells the scheduler that the parser thread is starting a new
> + /// data-parallel statement. Any Iterate that is handed off to the
> + /// scheduler between beginGeneration() and endGeneration() belongs
> + /// to the same data-paralllel statement and therefore has the same
> + /// generation number.
> + /// Nested invocations are handled as being part of the outermost
> + /// generation.
>
> -};
> + void beginGeneration()
> + {
> + // Ensure proper overflow behavior.
> + if (++generation_m < 0)
> + generation_m = 0;
> + generationStack_m.push(generation_m);
> + }
>
> -//-----------------------------------------------------------------------------
> + /// Tells the scheduler that no more Iterates will be handed off for
> + /// the data parallel statement that was begun with a
> + /// beginGeneration().
>
> -/*------------------------------------------------------------------------
> -CLASS
> - Iterate_SerialAsync
> -
> - Iterate<SerialAsync> is used to implement the SerialAsync
> - scheduling policy.
> -
> -KEYWORDS
> - Data_Parallelism, Native_Interface, IterateScheduler, Data_Flow.
> -
> -DESCRIPTION
> - An Iterate is a non-blocking unit of concurrency that is used
> - to describe a chunk of work. It inherits from the Runnable
> - class and as all subclasses of Runnable, the user specializes
> - the run() method to specify the operation.
> - Iterate<SerialAsync> is a further specialization of the
> - Iterate class to use the SerialAsync Scheduling algorithm to
> - generate the data dependency graph for a data-driven
> - execution. */
> + void endGeneration()
> + {
> + PAssert(inGeneration());
> + generationStack_m.pop();
>
> -template<>
> -class Iterate<SerialAsync> : public Runnable
> -{
> - friend class IterateScheduler<SerialAsync>;
> - friend class DataObject<SerialAsync>;
> +#if POOMA_MPI
> + // this is a safe point to block until we have "lots" of MPI Requests
> + if (!inGeneration())
> + while (!SystemContext::haveLotsOfMPIRequests())
> + SystemContext::runSomething(true);
> +#endif
> + }
>
> -public:
> + /// Wether we are inside a generation and may not safely block.
>
> - typedef DataObject<SerialAsync> DataObject_t;
> - typedef IterateScheduler<SerialAsync> IterateScheduler_t;
> + bool inGeneration() const
> + {
> + return !generationStack_m.empty();
> + }
>
> + /// What the current generation is.
>
> - ///////////////////////////
> - // The Constructor for this class takes the IterateScheduler and a
> - // CPU affinity. CPU affinity has a default value of -1 which means
> - // it may run on any CPU available.
> - //
> - inline Iterate(IterateScheduler<SerialAsync> & s, int affinity=-1);
> -
> - ///////////////////////////
> - // The dtor is virtual because the subclasses will need to add to it.
> - //
> - virtual ~Iterate() {}
> + int generation() const
> + {
> + if (!inGeneration())
> + return -1;
> + return generationStack_m.top();
> + }
>
> - ///////////////////////////
> - // The run method does the core work of the Iterate.
> - // It is supplied by the subclass.
> - //
> - virtual void run() = 0;
> + /// The parser thread calls this method to evaluate the generated
> + /// graph until all the nodes in the dependence graph has been
> + /// executed by the scheduler. That is to say, the scheduler
> + /// executes all the Iterates that has been handed off to it by the
> + /// parser thread.
>
> - ///////////////////////////
> - // Stub in all the affinities, because there is no such thing
> - // in serial.
> - //
> - inline int affinity() const {return 0;}
> - ///////////////////////////
> - // Stub in all the affinities, because there is no such thing
> - // in serial.
> - //
> - inline int hintAffinity() const {return 0;}
> - ///////////////////////////
> - // Stub in all the affinities, because there is no such thing
> - // in serial.
> - //
> - inline void affinity(int) {}
> - ///////////////////////////
> - // Stub in all the affinities, because there is no such thing
> - // in serial.
> - //
> - inline void hintAffinity(int) {}
> + void blockingEvaluate()
> + {
> + if (inGeneration()) {
> + // It's not safe to block inside a generation, so
> + // just do as much as we can without blocking.
> + while (SystemContext::runSomething(false))
> + ;
> +
> + } else {
> + // Loop as long as there is anything in the queue.
> + while (SystemContext::workReady())
> + SystemContext::runSomething(true);
> + }
> + }
>
> - ///////////////////////////
> - // Notify is used to indicate to the Iterate that one of the data
> - // objects it had requested has been granted. To do this, we dec a
> - // dependence counter which, if equal to 0, the Iterate is ready for
> - // execution.
> - //
> - inline void notify();
> -
> - ///////////////////////////
> - // How many notifications remain?
> - //
> - inline
> - int notifications() const { return notifications_m; }
> + /// The parser thread calls this method to ask the scheduler to run
> + /// the given Iterate when the dependence on that Iterate has been
> + /// satisfied.
>
> - inline void addNotification()
> + void handOff(Iterate<SerialAsync>* it)
> {
> - notifications_m++;
> + // No action needs to be taken here. Iterates will make their
> + // own way into the execution queue.
> + it->generation() = generation();
> + it->notify();
> }
>
> -protected:
> + void releaseIterates() { }
>
> - // What scheduler are we working with?
> - IterateScheduler<SerialAsync> &scheduler_m;
> +private:
>
> - // How many notifications should we receive before we can run?
> - int notifications_m;
> + typedef std::list<Iterate_t*> Container_t;
> + typedef Container_t::iterator Iterator_t;
>
> -private:
> - // Set notifications dynamically and automatically every time a
> - // request is made by the iterate
> - void incr_notifications() { notifications_m++;}
> + static std::stack<int> generationStack_m;
> + int generation_m;
>
> };
>
>
> -//-----------------------------------------------------------------------------
> -
> -/*------------------------------------------------------------------------
> -CLASS
> - DataObject_SerialAsync
> -
> - Implements a asynchronous scheduler for a data driven execution.
> -KEYWORDS
> - Data-parallelism, Native-interface, IterateScheduler.
> -
> -DESCRIPTION
> - The DataObject Class is used introduce a type to represent
> - a resources (normally) blocks of data) that Iterates contend
> - for atomic access. Iterates make request for either a read or
> - write to the DataObjects. DataObjects may grant the request if
> - the object is currently available. Otherwise, the request is
> - enqueue in a queue private to the data object until the
> - DataObject is release by another Iterate. A set of read
> - requests may be granted all at once if there are no
> - intervening write request to that DataObject.
> - DataObject<SerialAsync> is a specialization of DataObject for
> - the policy template SerialAsync.
> -*/
> +/**
> + * Implements a asynchronous scheduler for a data driven execution.
> + *
> + * The DataObject Class is used introduce a type to represent
> + * a resources (normally) blocks of data) that Iterates contend
> + * for atomic access. Iterates make request for either a read or
> + * write to the DataObjects. DataObjects may grant the request if
> + * the object is currently available. Otherwise, the request is
> + * enqueue in a queue private to the data object until the
> + * DataObject is release by another Iterate. A set of read
> + * requests may be granted all at once if there are no
> + * intervening write request to that DataObject.
> + * DataObject<SerialAsync> is a specialization of DataObject for
> + * the policy template SerialAsync.
> + *
> + * There are two ways data can be used: to read or to write.
> + * Don't change this to give more than two states;
> + * things inside depend on that.
> + */
>
> template<>
> class DataObject<SerialAsync>
> @@ -413,54 +513,56 @@
> typedef IterateScheduler<SerialAsync> IterateScheduler_t;
> typedef Iterate<SerialAsync> Iterate_t;
>
> - // There are two ways data can be used: to read or to write.
> - // Don't change this to give more than two states:
> - // things inside depend on that.
> -
> - ///////////////////////////
> - // Construct the data object with an empty set of requests
> - // and the given affinity.
> - //
> - inline DataObject(int affinity=-1);
> +
> + /// Construct the data object with an empty set of requests
> + /// and the given affinity.
> +
> + DataObject(int affinity=-1)
> + : released_m(queue_m.end()), notifications_m(0)
> + {
> + // released_m to the end of the queue (which should) also be the
> + // beginning. notifications_m to zero, since nothing has been
> + // released yet.
> + }
>
> - ///////////////////////////
> - // for compatibility with other SMARTS schedulers, accept
> - // Scheduler arguments (unused)
> - //
> - inline
> - DataObject(int affinity, IterateScheduler<SerialAsync>&);
> -
> - ///////////////////////////
> - // Stub out affinity because there is no affinity in serial.
> - //
> - inline int affinity() const { return 0; }
> -
> - ///////////////////////////
> - // Stub out affinity because there is no affinity in serial.
> - //
> - inline void affinity(int) {}
> + /// for compatibility with other SMARTS schedulers, accept
> + /// Scheduler arguments (unused)
>
> - ///////////////////////////
> - // An iterate makes a request for a certain action in a certain
> - // generation.
> - //
> - inline
> - void request(Iterate<SerialAsync>&, SerialAsync::Action);
> -
> - ///////////////////////////
> - // An iterate finishes and tells the DataObject it no longer needs
> - // it. If this is the last release for the current set of
> - // requests, have the IterateScheduler release some more.
> - //
> - inline void release(SerialAsync::Action);
> + inline DataObject(int affinity, IterateScheduler<SerialAsync>&)
> + : released_m(queue_m.end()), notifications_m(0)
> + {}
> +
> + /// Stub out affinity because there is no affinity in serial.
> +
> + int affinity() const { return 0; }
> +
> + /// Stub out affinity because there is no affinity in serial.
> +
> + void affinity(int) {}
> +
> + /// An iterate makes a request for a certain action in a certain
> + /// generation.
> +
> + inline void request(Iterate<SerialAsync>&, SerialAsync::Action);
> +
> + /// An iterate finishes and tells the DataObject it no longer needs
> + /// it. If this is the last release for the current set of
> + /// requests, have the IterateScheduler release some more.
> +
> + void release(SerialAsync::Action)
> + {
> + if (--notifications_m == 0)
> + releaseIterates();
> + }
>
> -protected:
> private:
>
> - // If release needs to let more iterates go, it calls this.
> + /// If release needs to let more iterates go, it calls this.
> inline void releaseIterates();
>
> - // The type for a request.
> + /**
> + * The type for a request.
> + */
> class Request
> {
> public:
> @@ -475,135 +577,27 @@
> SerialAsync::Action act_m;
> };
>
> - // The type of the queue and iterator.
> + /// The type of the queue and iterator.
> typedef std::list<Request> Container_t;
> typedef Container_t::iterator Iterator_t;
>
> - // The list of requests from various iterates.
> - // They're granted in FIFO order.
> + /// The list of requests from various iterates.
> + /// They're granted in FIFO order.
> Container_t queue_m;
>
> - // Pointer to the last request that has been granted.
> + /// Pointer to the last request that has been granted.
> Iterator_t released_m;
>
> - // The number of outstanding notifications.
> + /// The number of outstanding notifications.
> int notifications_m;
> };
>
> -//////////////////////////////////////////////////////////////////////
> -//
> -// Inline implementation of the functions for
> -// IterateScheduler<SerialAsync>
> -//
> -//////////////////////////////////////////////////////////////////////
> -
> -//
> -// IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>*)
> -// No action needs to be taken here. Iterates will make their
> -// own way into the execution queue.
> -//
> -
> -inline void
> -IterateScheduler<SerialAsync>::handOff(Iterate<SerialAsync>* it)
> -{
> - it->notify();
> -}
> -
> -//////////////////////////////////////////////////////////////////////
> -//
> -// Inline implementation of the functions for Iterate<SerialAsync>
> -//
> -//////////////////////////////////////////////////////////////////////
> -
> -//
> -// Iterate<SerialAsync>::Iterate
> -// Construct with the scheduler and the number of notifications.
> -// Ignore the affinity.
> -//
> -
> -inline
> -Iterate<SerialAsync>::Iterate(IterateScheduler<SerialAsync>& s, int)
> -: scheduler_m(s), notifications_m(1)
> -{
> -}
> -
> -//
> -// Iterate<SerialAsync>::notify
> -// Notify the iterate that a DataObject is ready.
> -// Decrement the counter, and if it is zero, alert the scheduler.
> -//
> -
> -inline void
> -Iterate<SerialAsync>::notify()
> -{
> - if ( --notifications_m == 0 )
> - {
> - add(this);
> - }
> -}
> -
> -//////////////////////////////////////////////////////////////////////
> -//
> -// Inline implementation of the functions for DataObject<SerialAsync>
> -//
> -//////////////////////////////////////////////////////////////////////
> -
> -//
> -// DataObject::DataObject()
> -// Initialize:
> -// released_m to the end of the queue (which should) also be the
> -// beginning. notifications_m to zero, since nothing has been
> -// released yet.
> -//
> -
> -inline
> -DataObject<SerialAsync>::DataObject(int)
> -: released_m(queue_m.end()), notifications_m(0)
> -{
> -}
> -
> -//
> -// void DataObject::release(Action)
> -// An iterate has finished and is telling the DataObject that
> -// it is no longer needed.
> -//
> +/// void DataObject::releaseIterates(SerialAsync::Action)
> +/// When the last released iterate dies, we need to
> +/// look at the beginning of the queue and tell more iterates
> +/// that they can access this data.
>
> inline void
> -DataObject<SerialAsync>::release(SerialAsync::Action)
> -{
> - if ( --notifications_m == 0 )
> - releaseIterates();
> -}
> -
> -
> -
> -//-----------------------------------------------------------------------------
> -//
> -// void IterateScheduler<SerialAsync>::blockingEvaluate
> -// Evaluate all the iterates in the queue.
> -//
> -//-----------------------------------------------------------------------------
> -inline
> -void
> -IterateScheduler<SerialAsync>::blockingEvaluate()
> -{
> - // Loop as long as there is anything in the queue.
> - while (SystemContext::workReady())
> - {
> - SystemContext::runSomething();
> - }
> -}
> -
> -//-----------------------------------------------------------------------------
> -//
> -// void DataObject::releaseIterates(SerialAsync::Action)
> -// When the last released iterate dies, we need to
> -// look at the beginning of the queue and tell more iterates
> -// that they can access this data.
> -//
> -//-----------------------------------------------------------------------------
> -inline
> -void
> DataObject<SerialAsync>::releaseIterates()
> {
> // Get rid of the reservations that have finished.
> @@ -622,14 +616,17 @@
> released_m->iterate().notify();
> ++notifications_m;
>
> - // Record what action that one will take.
> + // Record what action that one will take
> + // and record its generation number
> SerialAsync::Action act = released_m->act();
> + int generation = released_m->iterate().generation();
>
> // Look at the next iterate.
> ++released_m;
>
> // If the first one was a read, release more.
> if ( act == SerialAsync::Read )
> + {
>
> // As long as we aren't at the end and we have more reads...
> while ((released_m != end) &&
> @@ -642,29 +639,30 @@
> // And go on to the next.
> ++released_m;
> }
> +
> + }
> +
> }
> }
>
> +/// void DataObject::request(Iterate&, action)
> +/// An iterate makes a reservation with this DataObject for a given
> +/// action in a given generation. The request may be granted
> +/// immediately.
>
> -//
> -// void DataObject::request(Iterate&, action)
> -// An iterate makes a reservation with this DataObject for a given
> -// action in a given generation. The request may be granted
> -// immediately.
> -//
> -inline
> -void
> +inline void
> DataObject<SerialAsync>::request(Iterate<SerialAsync>& it,
> SerialAsync::Action act)
>
> {
> // The request can be granted immediately if:
> // The queue is currently empty, or
> - // The request is a read and everything in the queue is a read.
> + // the request is a read and everything in the queue is a read,
> + // or (with relaxed conditions), everything is the same generation.
>
> // Set notifications dynamically and automatically
> // every time a request is made by the iterate
> - it.incr_notifications();
> + it.notifications_m++;
>
> bool allReleased = (queue_m.end() == released_m);
> bool releasable = queue_m.empty() ||
> @@ -691,17 +689,11 @@
> }
>
>
> -//----------------------------------------------------------------------
> -
> -
> -//
> -// End of Smarts namespace.
> -//
> -}
> +} // namespace Smarts
>
> //////////////////////////////////////////////////////////////////////
>
> -#endif // POOMA_PACKAGE_CLASS_H
> +#endif // _SerialAsync_h_
>
> /***************************************************************************
> * $RCSfile: SerialAsync.h,v $ $Author: sa_smith $
> --- /home/richard/src/pooma/cvs/r2/src/Threads/IterateSchedulers/SerialAsync.cmpl.cpp 2000-04-12 02:08:06.000000000 +0200
> +++ Threads/IterateSchedulers/SerialAsync.cmpl.cpp 2004-01-02 00:40:16.000000000 +0100
> @@ -82,6 +82,12 @@
>
> std::list<RunnablePtr_t> SystemContext::workQueueMessages_m;
> std::list<RunnablePtr_t> SystemContext::workQueue_m;
> +#if POOMA_MPI
> + MPI_Request SystemContext::requests_m[1024];
> + std::map<int, SystemContext::IteratePtr_t> SystemContext::allocated_requests_m;
> + std::set<int> SystemContext::free_requests_m;
> +#endif
> +std::stack<int> IterateScheduler<SerialAsync>::generationStack_m;
>
> }
>
--
Jeffrey D. Oldham
oldham at codesourcery.com
More information about the pooma-dev
mailing list