[PATCH] MPI SendReceive
Richard Guenther
rguenth at tat.physik.uni-tuebingen.de
Tue Dec 30 20:41:07 UTC 2003
Hi!
This is now the MPI version of SendReceive.h, including changes to
RemoteEngine.h which handles (de-)serialization of engines. The latter
change allows optimizing away one of the three(!) copies we are doing
currently for communicating an engine at receive time:
- receive into message buffer
- deserialize into temporary brick engine
- copy temporary brick engine to target view
the message buffer is now directly deserialized into the target view (for
non-Cheetah operation, with Cheetah this is not possible). Patch which
removes a fourth(!!) copy we're doing at guard update follows.
Tested as usual.
Ok?
Richard.
2003Dec30 Richard Guenther <richard.guenther at uni-tuebingen.de>
* src/Engine/RemoteEngine.h: add deserializer into existing
engine.
src/Tulip/SendReceive.h: add MPI variant.
===== RemoteEngine.h 1.9 vs 1.16 =====
--- 1.9/r2/src/Engine/RemoteEngine.h Wed Dec 10 11:19:05 2003
+++ 1.16/r2/src/Engine/RemoteEngine.h Tue Dec 30 21:26:06 2003
@@ -1239,6 +1241,7 @@
t = *a;
buffer_m += change;
total_m += change;
+ Cheetah::Serialize<Cheetah::CHEETAH, T>::cleanup(a);
}
char *buffer_m;
@@ -1248,6 +1251,9 @@
namespace Cheetah {
+// All these serializers/deserializers share a common header,
+// namely domain and compressed flag.
+
template<int Dim, class T>
class Serialize<CHEETAH, Engine<Dim, T, BrickView> >
{
@@ -1261,6 +1267,8 @@
int nBytes=0;
nBytes += Serialize<CHEETAH, Domain_t>::size(a.domain());
+ bool compressed = false;
+ nBytes += Serialize<CHEETAH, bool>::size(compressed);
nBytes += a.domain().size() * Serialize<CHEETAH, T>::size(T());
return nBytes;
@@ -1278,6 +1286,11 @@
buffer += change;
nBytes += change;
+ bool compressed = false;
+ change = Serialize<CHEETAH, bool>::pack(compressed, buffer);
+ buffer += change;
+ nBytes += change;
+
EngineElemSerialize op(buffer);
change = EngineBlockSerialize::apply(op, a, dom);
@@ -1287,20 +1300,54 @@
return nBytes;
}
+ // We support a special unpack to avoid an extra copy.
+
static inline int
- unpack(Engine_t* &a, char *buffer)
+ unpack(Engine_t &a, char *buffer)
{
- // We'll unpack into a Brick rather than a BrickView, since
- // we just copy from it anyway.
+ Interval<Dim> *dom;
- PAssert(false);
- }
+ int change;
+ int nBytes=0;
- static inline void
- cleanup(Engine_t* a)
- {
- delete a;
+ change = Serialize<CHEETAH, Domain_t>::unpack(dom, buffer);
+ buffer += change;
+ nBytes += change;
+
+ bool *compressed;
+ change = Serialize<CHEETAH, bool>::unpack(compressed, buffer);
+ buffer += change;
+ nBytes += change;
+
+ // domains dont match probably, but at least their sizes must
+ for (int i=0; i<Dim; ++i)
+ PAssert((*dom)[i].size() == a.domain()[i].size());
+
+ if (*compressed)
+ {
+ T *value;
+ change = Serialize<CHEETAH, T>::unpack(value, buffer);
+
+ // we can't use usual array assignment here, because this would
+ // irritate the scheduler and lead to bogous results
+ Array<Engine_t::dimensions, T, typename Engine_t::Tag_t> lhs;
+ lhs.engine() = a;
+ Array<Engine_t::dimensions, T, ConstantFunction> rhs(*dom);
+ rhs.engine().setConstant(*value);
+ KernelEvaluator<InlineKernelTag>::evaluate(lhs, OpAssign(), rhs);
+ } else {
+ EngineElemDeSerialize op(buffer);
+
+ change = EngineBlockSerialize::apply(op, a, a.domain());
+ }
+ nBytes += change;
+
+ Serialize<CHEETAH, Domain_t>::cleanup(dom);
+ Serialize<CHEETAH, bool>::cleanup(compressed);
+
+ return nBytes;
}
+
};
template<int Dim, class T>
@@ -1316,6 +1363,8 @@
int nBytes=0;
nBytes += Serialize<CHEETAH, Domain_t>::size(a.domain());
+ bool compressed = false;
+ nBytes += Serialize<CHEETAH, bool>::size(compressed);
nBytes += a.domain().size() * Serialize<CHEETAH, T>::size(T());
return nBytes;
@@ -1333,6 +1382,11 @@
buffer += change;
nBytes += change;
+ bool compressed = false;
+ change = Serialize<CHEETAH, bool>::pack(compressed, buffer);
+ buffer += change;
+ nBytes += change;
+
EngineElemSerialize op(buffer);
change = EngineBlockSerialize::apply(op, a, dom);
@@ -1342,6 +1396,8 @@
return nBytes;
}
+ // Old-style unpack with extra copy.
+
static inline int
unpack(Engine_t* &a, char *buffer)
{
@@ -1354,6 +1410,12 @@
buffer += change;
nBytes += change;
+ bool *compressed;
+ change = Serialize<CHEETAH, bool>::unpack(compressed, buffer);
+ buffer += change;
+ nBytes += change;
+ PAssert(!*compressed);
+
a = new Engine<Dim, T, Brick>(*dom);
EngineElemDeSerialize op(buffer);
@@ -1362,6 +1424,9 @@
nBytes += change;
+ Serialize<CHEETAH, Domain_t>::cleanup(dom);
+ Serialize<CHEETAH, bool>::cleanup(compressed);
+
return nBytes;
}
@@ -1370,6 +1435,7 @@
{
delete a;
}
+
};
template<int Dim, class T>
@@ -1386,7 +1452,10 @@
nBytes += Serialize<CHEETAH, Domain_t>::size(a.domain());
- bool compressed = a.compressed();
+ // we cannot use a.compressed() here, because we need to
+ // set up a big enough receive buffer and the compressed
+ // flag is not valid across contexts.
+ bool compressed = false;
nBytes += Serialize<CHEETAH, bool>::size(compressed);
if (compressed)
@@ -1433,6 +1502,8 @@
return nBytes;
}
+ // Old-style unpack with extra copy.
+
static inline int
unpack(Engine_t* &a, char *buffer)
{
@@ -1446,7 +1517,6 @@
nBytes += change;
bool *compressed;
-
change = Serialize<CHEETAH, bool>::unpack(compressed, buffer);
buffer += change;
nBytes += change;
@@ -1469,6 +1539,9 @@
}
nBytes += change;
+ Serialize<CHEETAH, Domain_t>::cleanup(dom);
+ Serialize<CHEETAH, bool>::cleanup(compressed);
+
return nBytes;
}
@@ -1477,6 +1550,7 @@
{
delete a;
}
+
};
template<int Dim, class T>
@@ -1493,7 +1567,10 @@
nBytes += Serialize<CHEETAH, Domain_t>::size(a.domain());
- bool compressed = a.compressed();
+ // we cannot use a.compressed() here, because we need to
+ // set up a big enough receive buffer and the compressed
+ // flag is not valid across contexts.
+ bool compressed = false;
nBytes += Serialize<CHEETAH, bool>::size(compressed);
if (compressed)
@@ -1541,8 +1618,10 @@
return nBytes;
}
+ // We support a special unpack to avoid an extra copy.
+
static inline int
- unpack(Engine_t* &a, char *buffer)
+ unpack(Engine_t &a, char *buffer)
{
Interval<Dim> *dom;
@@ -1554,40 +1633,36 @@
nBytes += change;
bool *compressed;
-
change = Serialize<CHEETAH, bool>::unpack(compressed, buffer);
buffer += change;
nBytes += change;
+ // domains dont match probably, but at least their sizes must
+ for (int i=0; i<Dim; ++i)
+ PAssert((*dom)[i].size() == a.domain()[i].size());
+
if (*compressed)
{
T *value;
change = Serialize<CHEETAH, T>::unpack(value, buffer);
- Engine<Dim, T, CompressibleBrick> foo(*dom, *value);
-
- a = new Engine_t(foo, *dom);
+ // we can't use usual array assignment here, because this would
+ // irritate the scheduler and lead to bogous results
+ a.compressedReadWrite() = *value;
}
else
{
- Engine<Dim, T, CompressibleBrick> foo(*dom);
-
EngineElemDeSerialize op(buffer);
- change = EngineBlockSerialize::apply(op, foo, *dom);
-
- a = new Engine_t(foo, *dom);
+ change = EngineBlockSerialize::apply(op, a, *dom);
}
nBytes += change;
- return nBytes;
- }
+ Serialize<CHEETAH, Domain_t>::cleanup(dom);
+ Serialize<CHEETAH, bool>::cleanup(compressed);
- static inline void
- cleanup(Engine_t* a)
- {
- delete a;
+ return nBytes;
}
};
--- SendReceive.h 2003-10-21 20:47:59.000000000 +0200
+++ /tmp/SendReceive.h 2003-12-30 21:34:17.000000000 +0100
@@ -57,9 +57,11 @@
// Includes:
//-----------------------------------------------------------------------------
+#include "Tulip/Messaging.h"
#include "Pooma/Pooma.h"
#include "Evaluator/InlineEvaluator.h"
-#include "Tulip/Messaging.h"
+#include "Evaluator/RequestLocks.h"
+#include "Engine/DataObject.h"
#include "Utilities/PAssert.h"
//-----------------------------------------------------------------------------
@@ -268,14 +270,228 @@
{
PAssert(fromContext >= 0);
int tag = Pooma::receiveTag(fromContext);
- Pooma::scheduler().handOff(new ReceiveIterate<View,
- IncomingView>(view,
- fromContext, tag));
+ Pooma::scheduler().handOff(new ReceiveIterate<View, IncomingView>
+ (view, fromContext, tag));
}
};
-#else // not POOMA_CHEETAH
+#elif POOMA_MPI
+
+
+/**
+ * A SendIterate requests a read lock on a piece of data. When that read lock
+ * is granted, we call a cheetah matching handler to send the data to the
+ * appropriate context. We construct the SendIterate with a tag that is used
+ * to match the appropriate ReceiveIterate on the remote context.
+ */
+
+template<class View>
+class SendIterate
+ : public Pooma::Iterate_t
+{
+public:
+ SendIterate(const View &view, int toContext, int tag)
+ : Pooma::Iterate_t(Pooma::scheduler()),
+ toContext_m(toContext),
+ tag_m(tag),
+ view_m(view)
+ {
+ PAssert(toContext >= 0);
+
+ hintAffinity(engineFunctor(view_m,
+ DataObjectRequest<BlockAffinity>()));
+
+#if POOMA_REORDER_ITERATES
+ // Priority interface was added to r2 version of serial async so that
+ // message send iterates would run before any other iterates.
+ priority(-1);
+#endif
+
+ DataObjectRequest<WriteRequest> writeReq(*this);
+ DataObjectRequest<ReadRequest> readReq(writeReq);
+ engineFunctor(view_m, readReq);
+ }
+
+ virtual void run()
+ {
+ typedef Cheetah::Serialize<Cheetah::CHEETAH, View> Serialize_t;
+
+ // serialize and send buffer
+ int length = Serialize_t::size(view_m);
+ buffer_m = new char[length];
+ Serialize_t::pack(view_m, buffer_m);
+ MPI_Request *request = Smarts::SystemContext::getMPIRequest(this);
+ int res = MPI_Isend(buffer_m, length, MPI_CHAR, toContext_m, tag_m,
+ MPI_COMM_WORLD, request);
+ PAssert(res == MPI_SUCCESS);
+
+ // release locks
+ DataObjectRequest<WriteRelease> writeReq;
+ DataObjectRequest<ReadRelease> readReq(writeReq);
+ engineFunctor(view_m, readReq);
+ }
+
+ virtual ~SendIterate()
+ {
+ // cleanup temporary objects.
+ delete[] buffer_m;
+ }
+
+private:
+
+ // Context we're sending the data to.
+
+ int toContext_m;
+
+ // A tag used to match the sent data with the right receive.
+
+ int tag_m;
+
+ // Communication buffer.
+
+ char *buffer_m;
+
+ // The data we're sending (typically a view of an array).
+
+ View view_m;
+};
+
+
+/**
+ * ReceiveIterate requests a write lock on a piece of data. When that lock
+ * is granted, we register the data with the cheetah matching handler which
+ * will fill the block when a message arrives. The write lock is released
+ * by the matching handler.
+ */
+
+template<class View, class IncomingView>
+class ReceiveIterate
+ : public Pooma::Iterate_t
+{
+public:
+
+ typedef ReceiveIterate<View, IncomingView> This_t;
+
+ ReceiveIterate(const View &view, int fromContext, int tag)
+ : Pooma::Iterate_t(Pooma::scheduler()),
+ fromContext_m(fromContext),
+ tag_m(tag), buffer_m(NULL),
+ view_m(view)
+ {
+ PAssert(fromContext >= 0);
+
+ hintAffinity(engineFunctor(view,
+ DataObjectRequest<BlockAffinity>()));
+
+#if POOMA_REORDER_ITERATES
+ // Priority interface was added to r2 version of serial async so that
+ // message receive iterates would run after any other iterates.
+ priority(-1);
+#endif
+
+ DataObjectRequest<WriteRequest> writeReq(*this);
+ engineFunctor(view, writeReq);
+
+ Pooma::addIncomingMessage();
+
+ // pre-allocate incoming buffer and issue async receive
+ // we may hog on requests here - so maybe we need to conditionalize
+ // this a bit on request availability?
+ if (Smarts::SystemContext::haveLotsOfMPIRequests()) {
+ int length = Cheetah::Serialize<Cheetah::CHEETAH, View>::size(view_m);
+ buffer_m = new char[length];
+ MPI_Request *request = Smarts::SystemContext::getMPIRequest(this);
+ int res = MPI_Irecv(buffer_m, length, MPI_CHAR, fromContext_m, tag_m,
+ MPI_COMM_WORLD, request);
+ PAssert(res == MPI_SUCCESS);
+ }
+ }
+
+ virtual void run()
+ {
+ // nothing - work is done in destructor, if we had enough requests free
+ if (!buffer_m) {
+ int length = Cheetah::Serialize<Cheetah::CHEETAH, View>::size(view_m);
+ buffer_m = new char[length];
+ MPI_Request *request = Smarts::SystemContext::getMPIRequest(this);
+ int res = MPI_Irecv(buffer_m, length, MPI_CHAR, fromContext_m, tag_m,
+ MPI_COMM_WORLD, request);
+ PAssert(res == MPI_SUCCESS);
+ }
+ }
+
+ virtual ~ReceiveIterate()
+ {
+ typedef Cheetah::Serialize<Cheetah::CHEETAH, View> Serialize_t;
+
+ // de-serialize into target view directly
+ Serialize_t::unpack(view_m, buffer_m);
+
+ // cleanup temporary objects
+ delete[] buffer_m;
+
+ // release locks
+ DataObjectRequest<WriteRelease> writeReq;
+ engineFunctor(view_m, writeReq);
+
+ Pooma::gotIncomingMessage();
+ }
+
+private:
+
+ // Context we're sending the data to.
+
+ int fromContext_m;
+
+ // A tag used to match the sent data with the right send.
+
+ int tag_m;
+
+ // Communication buffer.
+
+ char *buffer_m;
+
+ // The place to put the data we're receiving (typically a view of the
+ // engine).;
+
+ View view_m;
+};
+
+/**
+ * SendReceive contains two static functions, send(view, context) and
+ * receive(view, context). These functions encapsulate generating matching
+ * tags for the send and receive and launching the iterates to perform the
+ * send and receive.
+ */
+
+struct SendReceive
+{
+ template<class View>
+ static
+ void send(const View &view, int toContext)
+ {
+ int tag = Pooma::sendTag(toContext);
+ Pooma::scheduler().handOff(new SendIterate<View>(view, toContext, tag));
+ }
+};
+
+template<class IncomingView>
+struct Receive
+{
+ template<class View>
+ static
+ void receive(const View &view, int fromContext)
+ {
+ PAssert(fromContext >= 0);
+ int tag = Pooma::receiveTag(fromContext);
+ Pooma::scheduler().handOff(new ReceiveIterate<View, IncomingView>
+ (view, fromContext, tag));
+ }
+};
+
+
+#else // not POOMA_MESSAGING
/**
@@ -305,7 +521,8 @@
}
};
-#endif // not POOMA_CHEETAH
+
+#endif // not POOMA_MESSAGING
//////////////////////////////////////////////////////////////////////
More information about the pooma-dev
mailing list