From 39964380f14a9b1e08518cf55cc9f56b7f99f063 Mon Sep 17 00:00:00 2001 From: stasinos Date: Tue, 12 Feb 2002 17:38:38 +0000 Subject: [PATCH] added MPI_Bcast git-svn-id: https://yap.svn.sf.net/svnroot/yap/trunk@357 b08c6af1-5177-4d33-ba66-4b1c6b8b522a --- TO_DO | 9 +- library/mpi/examples/demo1.pl | 1 + library/mpi/examples/demo2.pl | 42 +++++ library/mpi/mpi.c | 279 ++++++++++++++++++++-------------- 4 files changed, 218 insertions(+), 113 deletions(-) create mode 100644 library/mpi/examples/demo2.pl diff --git a/TO_DO b/TO_DO index 7aaa86a58..2dd2e75b6 100644 --- a/TO_DO +++ b/TO_DO @@ -18,10 +18,15 @@ STK: - start devel.tex. - should sigactions be executed when Yap is waiting at the prompt? - see if on_signal/3 works through the C interface as well. +- current_signal/3 does not know the SigId. - see how MPI libraries interact with YAP's signal handling. - make the MPI interface able to pass infinite terms to MPI_Send(). - - +- MPE Logging. +- Make cmd-line arg passing easier (i.e. not require run-script). +- mpi_open/3 is not really doing what it promises: the processes have + already been cloned. This can be a problem if random numbers are + used before calling mpi_open/3. + TO CHECK: - bad register allocation for a(X,Y) :- X is Y+2.3 ? diff --git a/library/mpi/examples/demo1.pl b/library/mpi/examples/demo1.pl index a851bb807..fa7d90136 100644 --- a/library/mpi/examples/demo1.pl +++ b/library/mpi/examples/demo1.pl @@ -16,6 +16,7 @@ calc( From, To, Acc, Res ) :- !, %% %% This spreads the work among the processors +%% and collects the results. %% do(0, Num) :- diff --git a/library/mpi/examples/demo2.pl b/library/mpi/examples/demo2.pl new file mode 100644 index 000000000..756e5e5f0 --- /dev/null +++ b/library/mpi/examples/demo2.pl @@ -0,0 +1,42 @@ +%% demo2.pl -- Stasinos Konstantopoulos +%% konstant@let.rug.nl, Tue Feb 12 2002 +%% + +%% +%% This the calculation that needs to be performed, in this case +%% the sum of [From..To] +%% + +calc( From, From, Acc, Res ) :- !, + Res is Acc + From. +calc( From, To, Acc, Res ) :- !, + Acc1 is Acc + To, + To1 is To - 1, + calc( From, To1, Acc1, Res ). + + +%% +%% We'll pretend the preprocessing was more complicated, and have the +%% root broadcast how many numbers each processor should do. +%% Each processor must then figure out which ones to do and call calc/4. +%% + +do(0, Num) :- + !, + mpi_bcast( Num, 0 ), + format( 'Proc 0: broadcast ~q.~n', [Num] ). +do(Rank, _) :- + !, + mpi_bcast( Num, 0 ), + format( 'Proc ~q: had ~q broadcast from 0.~n', [Rank, Num] ). + + +%% +%% This is the entry point +%% + +start(Num) :- + mpi_open( Rank, NumProc, ProcName ), + format( 'Rank: ~q NumProc: ~q, ProcName: ~q~n', [Rank,NumProc,ProcName] ), + do( Rank, Num ), + format( 'Rank ~q finished!~n', [Rank] ). diff --git a/library/mpi/mpi.c b/library/mpi/mpi.c index 1164be5f3..130cd855c 100644 --- a/library/mpi/mpi.c +++ b/library/mpi/mpi.c @@ -9,25 +9,26 @@ ************************************************************************** * * * File: mpi.c * -* Last rev: $Date: 2002-02-11 20:40:09 $ * +* Last rev: $Date: 2002-02-12 17:35:41 $ * * mods: * * comments: Interface to an MPI library * * * *************************************************************************/ #ifndef lint -static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.1 2002-02-11 20:40:09 stasinos Exp $"; +static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.2 2002-02-12 17:35:41 stasinos Exp $"; #endif #include "Yap.h" + +#if HAVE_MPI + #include "Yatom.h" #include "yapio.h" /* for AtomEof */ #include "Heap.h" -#if HAVE_MPI - #include #include #include @@ -36,6 +37,7 @@ STATIC_PROTO (Int p_mpi_open, (void)); STATIC_PROTO (Int p_mpi_close, (void)); STATIC_PROTO (Int p_mpi_send, (void)); STATIC_PROTO (Int p_mpi_receive, (void)); +STATIC_PROTO (Int p_mpi_bcast, (void)); /* @@ -47,7 +49,9 @@ static char processor_name[MPI_MAX_PROCESSOR_NAME]; /* mini-stream */ -static char buf[255]; +#define MAX_TERM_SIZE 1024 + +static char buf[MAX_TERM_SIZE + 5]; static int bufptr, buflen; static int @@ -70,6 +74,96 @@ mpi_eob(void) return (bufptrTok != Ord (eot_tok)) { + /* we got the end of file from an abort */ + if (ErrorMessage == "Abort") { + TR = old_TR; + return(NULL); + } + /* we need to force the next reading to also give end of file.*/ + buf[bufptr] = EOF; + ErrorMessage = "[ Error: end of file found before end of term ]"; + } else { + /* restore TR */ + TR = old_TR; + if( unify_constant (ARG2, MkAtomTerm (AtomEof)) ) { + /* this might be a reasonable place to reach, but i don't know when */ + puts("1XXXXXXXXXXXXXXXXXX"); + return(NULL); + } + else { + puts("2XXXXXXXXXXXXXXXXXX"); + return NULL; + } + } + } + repeat_cycle: + if (ErrorMessage || (t = Parse ()) == 0) { + if (ErrorMessage && (strcmp(ErrorMessage,"Stack Overflow") == 0)) { + /* ignore term we just built */ + H = old_H; + if (growstack_in_parser(&old_TR, &tokstart, &VarTable)) { + tokptr = toktide = tokstart; + ErrorMessage = NULL; + goto repeat_cycle; + } + } + TR = old_TR; + if (ErrorMessage) + YP_fprintf (YP_stderr, "%s", ErrorMessage); + else + syntax_error (tokstart); + YP_fprintf (YP_stderr, " ]\n"); + + Error(SYSTEM_ERROR,TermNil,NULL); + return(NULL); + + } else { + /* parsing succeeded */ + break; + } + } + + while (TRUE) { + CELL *old_H = H; + + if (setjmp(IOBotch) == 0) { + v = VarNames(VarTable, TermNil); + TR = old_TR; + break; + } else { + /* don't need to recheck tokens */ + tokstart = NULL; + /* restart global */ + H = old_H; + growstack_in_parser(&old_TR, &tokstart, &VarTable); + old_H = H; + } + } + return t; +} + + /* * C Predicates @@ -97,7 +191,6 @@ p_mpi_close() } - static Int p_mpi_send() /* mpi_send(+data, +destination, +tag) */ { @@ -134,44 +227,29 @@ p_mpi_send() /* mpi_send(+data, +destination, +tag) */ bufptr = 0; /* Turn the term into its ASCII representation */ plwrite( t_data, mpi_putc, 5 ); - buf[bufptr] = 0; - strcat( buf, ". " ); - retv = MPI_Send( &buf, 255, MPI_CHAR, dest, tag, MPI_COMM_WORLD ); + /* Careful: the buf is not NULL-terminated and does not have the + trailing ". " required by the parser */ + retv = MPI_Send( &buf, bufptr, MPI_CHAR, dest, tag, MPI_COMM_WORLD ); if( retv == 0 ) return TRUE; else return FALSE; } - static Int p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */ { - Term t_orig = Deref(ARG2), t_tag = Deref(ARG3), t; + Term t_data = Deref(ARG1), t_orig = Deref(ARG2), t_tag = Deref(ARG3); int tag, orig, retv; MPI_Status status; -#if 0 - /* The second argument (source) must be bound to an integer - (the rank of the source) or the atom mpi_any_source */ - if (IsVarTerm(t_orig)) { - Error(INSTANTIATION_ERROR, t_orig, "mpi_receive"); - return (FALSE); - } else if( IsAtomTerm(t_orig) ) { - if( AtomOfTerm(t_orig) == LookupAtom("mpi_any_source") ) - orig = MPI_ANY_SOURCE; - else { - Error(TYPE_ERROR_ATOM, t_orig, "mpi_receive"); - return (FALSE); - } - } else if( !IsIntegerTerm(t_orig) ) { - Error(TYPE_ERROR_INTEGER, t_orig, "mpi_receive"); - return (FALSE); - } else { - orig = IntOfTerm( t_orig ); + /* The first argument (data) must be unbound */ + if(!IsVarTerm(t_data)) { + Error(INSTANTIATION_ERROR, t_data, "mpi_receive"); + return FALSE; } -#else + /* The second argument (source) must be bound to an integer (the rank of the source) or left unbound (i.e. any source is OK) */ @@ -183,10 +261,9 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */ } else { orig = IntOfTerm( t_orig ); } -#endif /* The third argument must be bound to an integer (the tag) - or left unbound (i.e. any source is OK) */ + or left unbound (i.e. any tag is OK) */ if (IsVarTerm(t_tag)) { tag = MPI_ANY_TAG; } else if( !IsIntegerTerm(t_tag) ) { @@ -196,95 +273,74 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */ tag = IntOfTerm( t_tag ); /* Receive the message as a C string */ - retv = MPI_Recv( buf, 255, MPI_CHAR, orig, tag, MPI_COMM_WORLD, &status ); + retv = MPI_Recv( buf, MAX_TERM_SIZE, MPI_CHAR, orig, tag, MPI_COMM_WORLD, &status ); if( retv != 0 ) return FALSE; if( orig == MPI_ANY_SOURCE ) unify(t_orig, MkIntTerm(status.MPI_SOURCE)); if( tag == MPI_ANY_TAG ) unify(t_tag, MkIntTerm(status.MPI_TAG)); - buflen = strlen( buf ); + /* NULL-terminate the string and add the ". " termination + required by the parser. */ + buf[status.count] = 0; + strcat( buf, ". " ); + buflen = status.count + 2; bufptr = 0; /* parse received string into a Prolog term */ - { - Term v; - TokEntry *tokstart, *fast_tokenizer (void); - tr_fr_ptr old_TR; - - old_TR = TR; - while( TRUE ) { - CELL *old_H = H; - - /* Scans the term using stack space */ - eot_before_eof = FALSE; - - /* the first arg is the getc_for_read, diff only if CharConv is on */ - tokstart = tokptr = toktide = tokenizer( mpi_getc, mpi_getc ); - - if ( mpi_eob() && !eot_before_eof) { - if (tokstart != NIL && tokstart->Tok != Ord (eot_tok)) { - /* we got the end of file from an abort */ - if (ErrorMessage == "Abort") { - TR = old_TR; - return(FALSE); - } - /* we need to force the next reading to also give end of file.*/ - buf[bufptr] = EOF; - ErrorMessage = "[ Error: end of file found before end of term ]"; - } else { - /* restore TR */ - TR = old_TR; - return (unify_constant (ARG2, MkAtomTerm (AtomEof))); - } - } - repeat_cycle: - if (ErrorMessage || (t = Parse ()) == 0) { - if (ErrorMessage && (strcmp(ErrorMessage,"Stack Overflow") == 0)) { - /* ignore term we just built */ - H = old_H; - if (growstack_in_parser(&old_TR, &tokstart, &VarTable)) { - tokptr = toktide = tokstart; - ErrorMessage = NULL; - goto repeat_cycle; - } - } - TR = old_TR; - if (ErrorMessage) - YP_fprintf (YP_stderr, "%s", ErrorMessage); - else - syntax_error (tokstart); - YP_fprintf (YP_stderr, " ]\n"); - - Error(SYSTEM_ERROR,TermNil,NULL); - return(FALSE); - - } else { - /* parsing succeeded */ - break; - } - } - - while (TRUE) { - CELL *old_H = H; - - if (setjmp(IOBotch) == 0) { - v = VarNames(VarTable, TermNil); - TR = old_TR; - break; - } else { - /* don't need to recheck tokens */ - tokstart = NULL; - /* restart global */ - H = old_H; - growstack_in_parser(&old_TR, &tokstart, &VarTable); - old_H = H; - } - } - } - - return unify(t, ARG1); + return unify(mpi_parse(), ARG1); } +static Int +p_mpi_bcast() /* mpi_bcast( ?data, +root ) */ +{ + Term t_data = Deref(ARG1), t_root = Deref(ARG2); + int root, retv; + + /* The second argument must be bound to an integer (the rank of + root processor */ + if (IsVarTerm(t_root)) { + Error(INSTANTIATION_ERROR, t_root, "mpi_bcast"); + return FALSE; + } + root = IntOfTerm( t_root ); + + /* If this is the root processor, then the first argument must + be bound to the term to be sent. */ + if( root == rank ) { + if( IsVarTerm(t_data) ) { + Error(INSTANTIATION_ERROR, t_data, "mpi_bcast"); + return FALSE; + } + bufptr = 0; + /* Turn the term into its ASCII representation */ + plwrite( t_data, mpi_putc, 5 ); + /* NULL-terminate the string and add the ". " termination + required by the parser. */ + buf[bufptr] = 0; + strcat( buf, ". " ); + } + + retv = MPI_Bcast( buf, MAX_TERM_SIZE, MPI_CHAR, root, MPI_COMM_WORLD ); + if( retv != 0 ) return FALSE; + + if( root == rank ) return TRUE; + else { + /* ARG1 must be unbound so that it can receive data */ + if( !IsVarTerm(t_data) ) { + Error(INSTANTIATION_ERROR, t_root, "mpi_bcast"); + return FALSE; + } + + buflen = strlen(buf); + bufptr = 0; + + /* parse received string into a Prolog term */ + return unify(mpi_parse(), ARG1); + } +} + + + /* * Init */ @@ -336,6 +392,7 @@ InitMPI(void) InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag ); InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag ); InitCPred( "mpi_receive", 3, p_mpi_receive, SafePredFlag ); + InitCPred( "mpi_bcast", 2, p_mpi_bcast, SafePredFlag ); } #endif /* HAVE_MPI */