add mpi_bcast/2

git-svn-id: https://yap.svn.sf.net/svnroot/yap/trunk@416 b08c6af1-5177-4d33-ba66-4b1c6b8b522a
This commit is contained in:
stasinos 2002-03-12 20:03:55 +00:00
parent bd8cb2e836
commit 5253df47dc

View File

@ -9,14 +9,14 @@
************************************************************************** **************************************************************************
* * * *
* File: mpi.c * * File: mpi.c *
* Last rev: $Date: 2002-02-27 13:41:24 $ * * Last rev: $Date: 2002-03-12 20:03:55 $ *
* mods: * * mods: *
* comments: Interface to an MPI library * * comments: Interface to an MPI library *
* * * *
*************************************************************************/ *************************************************************************/
#ifndef lint #ifndef lint
static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.5 2002-02-27 13:41:24 stasinos Exp $"; static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.6 2002-03-12 20:03:55 stasinos Exp $";
#endif #endif
#include "Yap.h" #include "Yap.h"
@ -37,7 +37,8 @@ STATIC_PROTO (Int p_mpi_open, (void));
STATIC_PROTO (Int p_mpi_close, (void)); STATIC_PROTO (Int p_mpi_close, (void));
STATIC_PROTO (Int p_mpi_send, (void)); STATIC_PROTO (Int p_mpi_send, (void));
STATIC_PROTO (Int p_mpi_receive, (void)); STATIC_PROTO (Int p_mpi_receive, (void));
STATIC_PROTO (Int p_mpi_bcast, (void)); STATIC_PROTO (Int p_mpi_bcast3, (void));
STATIC_PROTO (Int p_mpi_bcast2, (void));
STATIC_PROTO (Int p_mpi_barrier, (void)); STATIC_PROTO (Int p_mpi_barrier, (void));
@ -139,7 +140,7 @@ mpi_parse(void)
/* we got the end of file from an abort */ /* we got the end of file from an abort */
if (ErrorMessage == "Abort") { if (ErrorMessage == "Abort") {
TR = old_TR; TR = old_TR;
return(NULL); return TermNil;
} }
/* we need to force the next reading to also give end of file.*/ /* we need to force the next reading to also give end of file.*/
buf[bufptr] = EOF; buf[bufptr] = EOF;
@ -150,11 +151,11 @@ mpi_parse(void)
if( unify_constant (ARG2, MkAtomTerm (AtomEof)) ) { if( unify_constant (ARG2, MkAtomTerm (AtomEof)) ) {
/* this might be a reasonable place to reach, but i don't know when */ /* this might be a reasonable place to reach, but i don't know when */
puts("1XXXXXXXXXXXXXXXXXX"); puts("1XXXXXXXXXXXXXXXXXX");
return(NULL); return TermNil ;
} }
else { else {
puts("2XXXXXXXXXXXXXXXXXX"); puts("2XXXXXXXXXXXXXXXXXX");
return NULL; return TermNil;
} }
} }
} }
@ -177,7 +178,7 @@ mpi_parse(void)
YP_fprintf (YP_stderr, " ]\n"); YP_fprintf (YP_stderr, " ]\n");
Error(SYSTEM_ERROR,TermNil,NULL); Error(SYSTEM_ERROR,TermNil,NULL);
return(NULL); return TermNil;
} else { } else {
/* parsing succeeded */ /* parsing succeeded */
@ -242,7 +243,6 @@ static Int
p_mpi_send() /* mpi_send(+data, +destination, +tag) */ p_mpi_send() /* mpi_send(+data, +destination, +tag) */
{ {
Term t_data = Deref(ARG1), t_dest = Deref(ARG2), t_tag = Deref(ARG3); Term t_data = Deref(ARG1), t_dest = Deref(ARG2), t_tag = Deref(ARG3);
char *data;
int tag, dest, retv; int tag, dest, retv;
/* The first argument (data) must be bound */ /* The first argument (data) must be bound */
@ -366,7 +366,7 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */
static Int static Int
p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */ p_mpi_bcast3() /* mpi_bcast( ?data, +root, +max_size ) */
{ {
Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3); Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3);
int root, retv, max_size; int root, retv, max_size;
@ -393,8 +393,8 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */
required by the parser. */ required by the parser. */
buf[bufptr] = 0; buf[bufptr] = 0;
strcat( buf, ". " ); strcat( buf, ". " );
bufstrlen = bufptr + 2;
} }
bufstrlen = bufptr + 2;
/* The third argument must be bound to an integer (the maximum length /* The third argument must be bound to an integer (the maximum length
of the broadcast term's ASCII representation */ of the broadcast term's ASCII representation */
@ -406,7 +406,7 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */
max_size = IntOfTerm( t_max_size ) + 3; max_size = IntOfTerm( t_max_size ) + 3;
#if 0 #if 0
if( max_size < bufstrlen ) if( (rank == root) && (max_size < bufstrlen) )
/* issue a warning? explode? bcast s'thing unparsable? */ /* issue a warning? explode? bcast s'thing unparsable? */
#endif #endif
@ -436,6 +436,76 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */
} }
/* This is the same as above, but for dynamic data size.
It is implemented as two broadcasts, the first being the size
and the second the actual data.
*/
static Int
p_mpi_bcast2() /* mpi_bcast( ?data, +root ) */
{
Term t_data = Deref(ARG1), t_root = Deref(ARG2);
int root, retv;
/* The second argument must be bound to an integer (the rank of
root processor */
if (IsVarTerm(t_root)) {
Error(INSTANTIATION_ERROR, t_root, "mpi_bcast");
return FALSE;
}
root = IntOfTerm( t_root );
/* If this is the root processor, then the first argument must
be bound to the term to be sent. */
if( root == rank ) {
if( IsVarTerm(t_data) ) {
Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
return FALSE;
}
bufptr = 0;
/* Turn the term into its ASCII representation */
plwrite( t_data, mpi_putc, 5 );
/* NULL-terminate the string and add the ". " termination
required by the parser. */
buf[bufptr] = 0;
strcat( buf, ". " );
bufstrlen = bufptr + 2;
}
/* Broadcast the data size */
retv = MPI_Bcast( &bufstrlen, sizeof bufstrlen, MPI_INT, root, MPI_COMM_WORLD );
if( retv != 0 ) return FALSE;
#if 1
printf("I am %d and I think the data is %d bytes long!\n", rank, bufstrlen);
#endif
/* adjust the buffer size, if necessary */
if( bufstrlen-bufsize > 0 ) {
printf("expanding by %d\n", bufstrlen-bufsize);
expand_buffer( bufstrlen - bufsize );
}
/* Broadcast the data */
retv = MPI_Bcast( buf, bufstrlen, MPI_CHAR, root, MPI_COMM_WORLD );
if( retv != 0 ) return FALSE;
if( root == rank ) return TRUE;
else {
/* ARG1 must be unbound so that it can receive data */
if( !IsVarTerm(t_data) ) {
Error(INSTANTIATION_ERROR, t_root, "mpi_bcast");
return FALSE;
}
bufstrlen = strlen(buf);
bufptr = 0;
/* parse received string into a Prolog term */
return unify(mpi_parse(), ARG1);
}
}
static Int static Int
p_mpi_barrier() /* mpi_barrier/0 */ p_mpi_barrier() /* mpi_barrier/0 */
{ {
@ -494,7 +564,8 @@ InitMPI(void)
InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag ); InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag );
InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag ); InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag );
InitCPred( "mpi_receive", 3, p_mpi_receive, SyncPredFlag ); InitCPred( "mpi_receive", 3, p_mpi_receive, SyncPredFlag );
InitCPred( "mpi_bcast", 3, p_mpi_bcast, SyncPredFlag ); InitCPred( "mpi_bcast", 3, p_mpi_bcast3, SyncPredFlag );
InitCPred( "mpi_bcast", 2, p_mpi_bcast2, SyncPredFlag );
InitCPred( "mpi_barrier", 0, p_mpi_barrier, 0 ); InitCPred( "mpi_barrier", 0, p_mpi_barrier, 0 );
} }