diff --git a/library/mpi/mpi.c b/library/mpi/mpi.c index ac5bc173b..26a685696 100644 --- a/library/mpi/mpi.c +++ b/library/mpi/mpi.c @@ -9,14 +9,14 @@ ************************************************************************** * * * File: mpi.c * -* Last rev: $Date: 2002-02-27 13:41:24 $ * +* Last rev: $Date: 2002-03-12 20:03:55 $ * * mods: * * comments: Interface to an MPI library * * * *************************************************************************/ #ifndef lint -static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.5 2002-02-27 13:41:24 stasinos Exp $"; +static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.6 2002-03-12 20:03:55 stasinos Exp $"; #endif #include "Yap.h" @@ -37,7 +37,8 @@ STATIC_PROTO (Int p_mpi_open, (void)); STATIC_PROTO (Int p_mpi_close, (void)); STATIC_PROTO (Int p_mpi_send, (void)); STATIC_PROTO (Int p_mpi_receive, (void)); -STATIC_PROTO (Int p_mpi_bcast, (void)); +STATIC_PROTO (Int p_mpi_bcast3, (void)); +STATIC_PROTO (Int p_mpi_bcast2, (void)); STATIC_PROTO (Int p_mpi_barrier, (void)); @@ -139,7 +140,7 @@ mpi_parse(void) /* we got the end of file from an abort */ if (ErrorMessage == "Abort") { TR = old_TR; - return(NULL); + return TermNil; } /* we need to force the next reading to also give end of file.*/ buf[bufptr] = EOF; @@ -150,11 +151,11 @@ mpi_parse(void) if( unify_constant (ARG2, MkAtomTerm (AtomEof)) ) { /* this might be a reasonable place to reach, but i don't know when */ puts("1XXXXXXXXXXXXXXXXXX"); - return(NULL); + return TermNil ; } else { puts("2XXXXXXXXXXXXXXXXXX"); - return NULL; + return TermNil; } } } @@ -177,7 +178,7 @@ mpi_parse(void) YP_fprintf (YP_stderr, " ]\n"); Error(SYSTEM_ERROR,TermNil,NULL); - return(NULL); + return TermNil; } else { /* parsing succeeded */ @@ -242,7 +243,6 @@ static Int p_mpi_send() /* mpi_send(+data, +destination, +tag) */ { Term t_data = Deref(ARG1), t_dest = Deref(ARG2), t_tag = Deref(ARG3); - char *data; int tag, dest, retv; /* The first argument (data) must be bound */ @@ -366,7 +366,7 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */ static Int -p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */ +p_mpi_bcast3() /* mpi_bcast( ?data, +root, +max_size ) */ { Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3); int root, retv, max_size; @@ -393,8 +393,8 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */ required by the parser. */ buf[bufptr] = 0; strcat( buf, ". " ); + bufstrlen = bufptr + 2; } - bufstrlen = bufptr + 2; /* The third argument must be bound to an integer (the maximum length of the broadcast term's ASCII representation */ @@ -406,7 +406,7 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */ max_size = IntOfTerm( t_max_size ) + 3; #if 0 - if( max_size < bufstrlen ) + if( (rank == root) && (max_size < bufstrlen) ) /* issue a warning? explode? bcast s'thing unparsable? */ #endif @@ -436,6 +436,76 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */ } +/* This is the same as above, but for dynamic data size. + It is implemented as two broadcasts, the first being the size + and the second the actual data. +*/ +static Int +p_mpi_bcast2() /* mpi_bcast( ?data, +root ) */ +{ + Term t_data = Deref(ARG1), t_root = Deref(ARG2); + int root, retv; + + /* The second argument must be bound to an integer (the rank of + root processor */ + if (IsVarTerm(t_root)) { + Error(INSTANTIATION_ERROR, t_root, "mpi_bcast"); + return FALSE; + } + root = IntOfTerm( t_root ); + + /* If this is the root processor, then the first argument must + be bound to the term to be sent. */ + if( root == rank ) { + if( IsVarTerm(t_data) ) { + Error(INSTANTIATION_ERROR, t_data, "mpi_bcast"); + return FALSE; + } + bufptr = 0; + /* Turn the term into its ASCII representation */ + plwrite( t_data, mpi_putc, 5 ); + /* NULL-terminate the string and add the ". " termination + required by the parser. */ + buf[bufptr] = 0; + strcat( buf, ". " ); + bufstrlen = bufptr + 2; + } + + /* Broadcast the data size */ + retv = MPI_Bcast( &bufstrlen, sizeof bufstrlen, MPI_INT, root, MPI_COMM_WORLD ); + if( retv != 0 ) return FALSE; + +#if 1 + printf("I am %d and I think the data is %d bytes long!\n", rank, bufstrlen); +#endif + + /* adjust the buffer size, if necessary */ + if( bufstrlen-bufsize > 0 ) { + printf("expanding by %d\n", bufstrlen-bufsize); + expand_buffer( bufstrlen - bufsize ); + } + + /* Broadcast the data */ + retv = MPI_Bcast( buf, bufstrlen, MPI_CHAR, root, MPI_COMM_WORLD ); + if( retv != 0 ) return FALSE; + + if( root == rank ) return TRUE; + else { + /* ARG1 must be unbound so that it can receive data */ + if( !IsVarTerm(t_data) ) { + Error(INSTANTIATION_ERROR, t_root, "mpi_bcast"); + return FALSE; + } + + bufstrlen = strlen(buf); + bufptr = 0; + + /* parse received string into a Prolog term */ + return unify(mpi_parse(), ARG1); + } +} + + static Int p_mpi_barrier() /* mpi_barrier/0 */ { @@ -494,7 +564,8 @@ InitMPI(void) InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag ); InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag ); InitCPred( "mpi_receive", 3, p_mpi_receive, SyncPredFlag ); - InitCPred( "mpi_bcast", 3, p_mpi_bcast, SyncPredFlag ); + InitCPred( "mpi_bcast", 3, p_mpi_bcast3, SyncPredFlag ); + InitCPred( "mpi_bcast", 2, p_mpi_bcast2, SyncPredFlag ); InitCPred( "mpi_barrier", 0, p_mpi_barrier, 0 ); }