added MPI_Bcast

git-svn-id: https://yap.svn.sf.net/svnroot/yap/trunk@357 b08c6af1-5177-4d33-ba66-4b1c6b8b522a
This commit is contained in:
stasinos 2002-02-12 17:38:38 +00:00
parent 8f4c6583d2
commit 39964380f1
4 changed files with 218 additions and 113 deletions

7
TO_DO
View File

@ -18,9 +18,14 @@ STK:
- start devel.tex. - start devel.tex.
- should sigactions be executed when Yap is waiting at the prompt? - should sigactions be executed when Yap is waiting at the prompt?
- see if on_signal/3 works through the C interface as well. - see if on_signal/3 works through the C interface as well.
- current_signal/3 does not know the SigId.
- see how MPI libraries interact with YAP's signal handling. - see how MPI libraries interact with YAP's signal handling.
- make the MPI interface able to pass infinite terms to MPI_Send(). - make the MPI interface able to pass infinite terms to MPI_Send().
- MPE Logging.
- Make cmd-line arg passing easier (i.e. not require run-script).
- mpi_open/3 is not really doing what it promises: the processes have
already been cloned. This can be a problem if random numbers are
used before calling mpi_open/3.
TO CHECK: TO CHECK:
- bad register allocation for a(X,Y) :- X is Y+2.3 ? - bad register allocation for a(X,Y) :- X is Y+2.3 ?

View File

@ -16,6 +16,7 @@ calc( From, To, Acc, Res ) :- !,
%% %%
%% This spreads the work among the processors %% This spreads the work among the processors
%% and collects the results.
%% %%
do(0, Num) :- do(0, Num) :-

View File

@ -0,0 +1,42 @@
%% demo2.pl -- Stasinos Konstantopoulos
%% konstant@let.rug.nl, Tue Feb 12 2002
%%
%%
%% This the calculation that needs to be performed, in this case
%% the sum of [From..To]
%%
calc( From, From, Acc, Res ) :- !,
Res is Acc + From.
calc( From, To, Acc, Res ) :- !,
Acc1 is Acc + To,
To1 is To - 1,
calc( From, To1, Acc1, Res ).
%%
%% We'll pretend the preprocessing was more complicated, and have the
%% root broadcast how many numbers each processor should do.
%% Each processor must then figure out which ones to do and call calc/4.
%%
do(0, Num) :-
!,
mpi_bcast( Num, 0 ),
format( 'Proc 0: broadcast ~q.~n', [Num] ).
do(Rank, _) :-
!,
mpi_bcast( Num, 0 ),
format( 'Proc ~q: had ~q broadcast from 0.~n', [Rank, Num] ).
%%
%% This is the entry point
%%
start(Num) :-
mpi_open( Rank, NumProc, ProcName ),
format( 'Rank: ~q NumProc: ~q, ProcName: ~q~n', [Rank,NumProc,ProcName] ),
do( Rank, Num ),
format( 'Rank ~q finished!~n', [Rank] ).

View File

@ -9,25 +9,26 @@
************************************************************************** **************************************************************************
* * * *
* File: mpi.c * * File: mpi.c *
* Last rev: $Date: 2002-02-11 20:40:09 $ * * Last rev: $Date: 2002-02-12 17:35:41 $ *
* mods: * * mods: *
* comments: Interface to an MPI library * * comments: Interface to an MPI library *
* * * *
*************************************************************************/ *************************************************************************/
#ifndef lint #ifndef lint
static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.1 2002-02-11 20:40:09 stasinos Exp $"; static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.2 2002-02-12 17:35:41 stasinos Exp $";
#endif #endif
#include "Yap.h" #include "Yap.h"
#if HAVE_MPI
#include "Yatom.h" #include "Yatom.h"
#include "yapio.h" #include "yapio.h"
/* for AtomEof */ /* for AtomEof */
#include "Heap.h" #include "Heap.h"
#if HAVE_MPI
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <mpi.h> #include <mpi.h>
@ -36,6 +37,7 @@ STATIC_PROTO (Int p_mpi_open, (void));
STATIC_PROTO (Int p_mpi_close, (void)); STATIC_PROTO (Int p_mpi_close, (void));
STATIC_PROTO (Int p_mpi_send, (void)); STATIC_PROTO (Int p_mpi_send, (void));
STATIC_PROTO (Int p_mpi_receive, (void)); STATIC_PROTO (Int p_mpi_receive, (void));
STATIC_PROTO (Int p_mpi_bcast, (void));
/* /*
@ -47,7 +49,9 @@ static char processor_name[MPI_MAX_PROCESSOR_NAME];
/* mini-stream */ /* mini-stream */
static char buf[255]; #define MAX_TERM_SIZE 1024
static char buf[MAX_TERM_SIZE + 5];
static int bufptr, buflen; static int bufptr, buflen;
static int static int
@ -70,6 +74,96 @@ mpi_eob(void)
return (bufptr<buflen) && (buf[bufptr] != EOF); return (bufptr<buflen) && (buf[bufptr] != EOF);
} }
/* Term parser */
static Term
mpi_parse(void)
{
Term v, t;
TokEntry *tokstart;
tr_fr_ptr old_TR;
old_TR = TR;
while( TRUE ) {
CELL *old_H = H;
/* Scans the term using stack space */
eot_before_eof = FALSE;
/* the first arg is the getc_for_read, diff only if CharConv is on */
tokstart = tokptr = toktide = tokenizer( mpi_getc, mpi_getc );
if ( mpi_eob() && !eot_before_eof) {
if (tokstart != NIL && tokstart->Tok != Ord (eot_tok)) {
/* we got the end of file from an abort */
if (ErrorMessage == "Abort") {
TR = old_TR;
return(NULL);
}
/* we need to force the next reading to also give end of file.*/
buf[bufptr] = EOF;
ErrorMessage = "[ Error: end of file found before end of term ]";
} else {
/* restore TR */
TR = old_TR;
if( unify_constant (ARG2, MkAtomTerm (AtomEof)) ) {
/* this might be a reasonable place to reach, but i don't know when */
puts("1XXXXXXXXXXXXXXXXXX");
return(NULL);
}
else {
puts("2XXXXXXXXXXXXXXXXXX");
return NULL;
}
}
}
repeat_cycle:
if (ErrorMessage || (t = Parse ()) == 0) {
if (ErrorMessage && (strcmp(ErrorMessage,"Stack Overflow") == 0)) {
/* ignore term we just built */
H = old_H;
if (growstack_in_parser(&old_TR, &tokstart, &VarTable)) {
tokptr = toktide = tokstart;
ErrorMessage = NULL;
goto repeat_cycle;
}
}
TR = old_TR;
if (ErrorMessage)
YP_fprintf (YP_stderr, "%s", ErrorMessage);
else
syntax_error (tokstart);
YP_fprintf (YP_stderr, " ]\n");
Error(SYSTEM_ERROR,TermNil,NULL);
return(NULL);
} else {
/* parsing succeeded */
break;
}
}
while (TRUE) {
CELL *old_H = H;
if (setjmp(IOBotch) == 0) {
v = VarNames(VarTable, TermNil);
TR = old_TR;
break;
} else {
/* don't need to recheck tokens */
tokstart = NULL;
/* restart global */
H = old_H;
growstack_in_parser(&old_TR, &tokstart, &VarTable);
old_H = H;
}
}
return t;
}
/* /*
* C Predicates * C Predicates
@ -97,7 +191,6 @@ p_mpi_close()
} }
static Int static Int
p_mpi_send() /* mpi_send(+data, +destination, +tag) */ p_mpi_send() /* mpi_send(+data, +destination, +tag) */
{ {
@ -134,44 +227,29 @@ p_mpi_send() /* mpi_send(+data, +destination, +tag) */
bufptr = 0; bufptr = 0;
/* Turn the term into its ASCII representation */ /* Turn the term into its ASCII representation */
plwrite( t_data, mpi_putc, 5 ); plwrite( t_data, mpi_putc, 5 );
buf[bufptr] = 0;
strcat( buf, ". " );
retv = MPI_Send( &buf, 255, MPI_CHAR, dest, tag, MPI_COMM_WORLD ); /* Careful: the buf is not NULL-terminated and does not have the
trailing ". " required by the parser */
retv = MPI_Send( &buf, bufptr, MPI_CHAR, dest, tag, MPI_COMM_WORLD );
if( retv == 0 ) return TRUE; if( retv == 0 ) return TRUE;
else return FALSE; else return FALSE;
} }
static Int static Int
p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */
{ {
Term t_orig = Deref(ARG2), t_tag = Deref(ARG3), t; Term t_data = Deref(ARG1), t_orig = Deref(ARG2), t_tag = Deref(ARG3);
int tag, orig, retv; int tag, orig, retv;
MPI_Status status; MPI_Status status;
#if 0 /* The first argument (data) must be unbound */
/* The second argument (source) must be bound to an integer if(!IsVarTerm(t_data)) {
(the rank of the source) or the atom mpi_any_source */ Error(INSTANTIATION_ERROR, t_data, "mpi_receive");
if (IsVarTerm(t_orig)) { return FALSE;
Error(INSTANTIATION_ERROR, t_orig, "mpi_receive");
return (FALSE);
} else if( IsAtomTerm(t_orig) ) {
if( AtomOfTerm(t_orig) == LookupAtom("mpi_any_source") )
orig = MPI_ANY_SOURCE;
else {
Error(TYPE_ERROR_ATOM, t_orig, "mpi_receive");
return (FALSE);
} }
} else if( !IsIntegerTerm(t_orig) ) {
Error(TYPE_ERROR_INTEGER, t_orig, "mpi_receive");
return (FALSE);
} else {
orig = IntOfTerm( t_orig );
}
#else
/* The second argument (source) must be bound to an integer /* The second argument (source) must be bound to an integer
(the rank of the source) or left unbound (i.e. any source (the rank of the source) or left unbound (i.e. any source
is OK) */ is OK) */
@ -183,10 +261,9 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */
} else { } else {
orig = IntOfTerm( t_orig ); orig = IntOfTerm( t_orig );
} }
#endif
/* The third argument must be bound to an integer (the tag) /* The third argument must be bound to an integer (the tag)
or left unbound (i.e. any source is OK) */ or left unbound (i.e. any tag is OK) */
if (IsVarTerm(t_tag)) { if (IsVarTerm(t_tag)) {
tag = MPI_ANY_TAG; tag = MPI_ANY_TAG;
} else if( !IsIntegerTerm(t_tag) ) { } else if( !IsIntegerTerm(t_tag) ) {
@ -196,95 +273,74 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */
tag = IntOfTerm( t_tag ); tag = IntOfTerm( t_tag );
/* Receive the message as a C string */ /* Receive the message as a C string */
retv = MPI_Recv( buf, 255, MPI_CHAR, orig, tag, MPI_COMM_WORLD, &status ); retv = MPI_Recv( buf, MAX_TERM_SIZE, MPI_CHAR, orig, tag, MPI_COMM_WORLD, &status );
if( retv != 0 ) return FALSE; if( retv != 0 ) return FALSE;
if( orig == MPI_ANY_SOURCE ) unify(t_orig, MkIntTerm(status.MPI_SOURCE)); if( orig == MPI_ANY_SOURCE ) unify(t_orig, MkIntTerm(status.MPI_SOURCE));
if( tag == MPI_ANY_TAG ) unify(t_tag, MkIntTerm(status.MPI_TAG)); if( tag == MPI_ANY_TAG ) unify(t_tag, MkIntTerm(status.MPI_TAG));
buflen = strlen( buf ); /* NULL-terminate the string and add the ". " termination
required by the parser. */
buf[status.count] = 0;
strcat( buf, ". " );
buflen = status.count + 2;
bufptr = 0; bufptr = 0;
/* parse received string into a Prolog term */ /* parse received string into a Prolog term */
{ return unify(mpi_parse(), ARG1);
Term v;
TokEntry *tokstart, *fast_tokenizer (void);
tr_fr_ptr old_TR;
old_TR = TR;
while( TRUE ) {
CELL *old_H = H;
/* Scans the term using stack space */
eot_before_eof = FALSE;
/* the first arg is the getc_for_read, diff only if CharConv is on */
tokstart = tokptr = toktide = tokenizer( mpi_getc, mpi_getc );
if ( mpi_eob() && !eot_before_eof) {
if (tokstart != NIL && tokstart->Tok != Ord (eot_tok)) {
/* we got the end of file from an abort */
if (ErrorMessage == "Abort") {
TR = old_TR;
return(FALSE);
}
/* we need to force the next reading to also give end of file.*/
buf[bufptr] = EOF;
ErrorMessage = "[ Error: end of file found before end of term ]";
} else {
/* restore TR */
TR = old_TR;
return (unify_constant (ARG2, MkAtomTerm (AtomEof)));
}
}
repeat_cycle:
if (ErrorMessage || (t = Parse ()) == 0) {
if (ErrorMessage && (strcmp(ErrorMessage,"Stack Overflow") == 0)) {
/* ignore term we just built */
H = old_H;
if (growstack_in_parser(&old_TR, &tokstart, &VarTable)) {
tokptr = toktide = tokstart;
ErrorMessage = NULL;
goto repeat_cycle;
}
}
TR = old_TR;
if (ErrorMessage)
YP_fprintf (YP_stderr, "%s", ErrorMessage);
else
syntax_error (tokstart);
YP_fprintf (YP_stderr, " ]\n");
Error(SYSTEM_ERROR,TermNil,NULL);
return(FALSE);
} else {
/* parsing succeeded */
break;
}
}
while (TRUE) {
CELL *old_H = H;
if (setjmp(IOBotch) == 0) {
v = VarNames(VarTable, TermNil);
TR = old_TR;
break;
} else {
/* don't need to recheck tokens */
tokstart = NULL;
/* restart global */
H = old_H;
growstack_in_parser(&old_TR, &tokstart, &VarTable);
old_H = H;
}
}
}
return unify(t, ARG1);
} }
static Int
p_mpi_bcast() /* mpi_bcast( ?data, +root ) */
{
Term t_data = Deref(ARG1), t_root = Deref(ARG2);
int root, retv;
/* The second argument must be bound to an integer (the rank of
root processor */
if (IsVarTerm(t_root)) {
Error(INSTANTIATION_ERROR, t_root, "mpi_bcast");
return FALSE;
}
root = IntOfTerm( t_root );
/* If this is the root processor, then the first argument must
be bound to the term to be sent. */
if( root == rank ) {
if( IsVarTerm(t_data) ) {
Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
return FALSE;
}
bufptr = 0;
/* Turn the term into its ASCII representation */
plwrite( t_data, mpi_putc, 5 );
/* NULL-terminate the string and add the ". " termination
required by the parser. */
buf[bufptr] = 0;
strcat( buf, ". " );
}
retv = MPI_Bcast( buf, MAX_TERM_SIZE, MPI_CHAR, root, MPI_COMM_WORLD );
if( retv != 0 ) return FALSE;
if( root == rank ) return TRUE;
else {
/* ARG1 must be unbound so that it can receive data */
if( !IsVarTerm(t_data) ) {
Error(INSTANTIATION_ERROR, t_root, "mpi_bcast");
return FALSE;
}
buflen = strlen(buf);
bufptr = 0;
/* parse received string into a Prolog term */
return unify(mpi_parse(), ARG1);
}
}
/* /*
* Init * Init
*/ */
@ -336,6 +392,7 @@ InitMPI(void)
InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag ); InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag );
InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag ); InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag );
InitCPred( "mpi_receive", 3, p_mpi_receive, SafePredFlag ); InitCPred( "mpi_receive", 3, p_mpi_receive, SafePredFlag );
InitCPred( "mpi_bcast", 2, p_mpi_bcast, SafePredFlag );
} }
#endif /* HAVE_MPI */ #endif /* HAVE_MPI */