remove the compile-time term size limit. do not clone before mpi_open/3 is invoked.

git-svn-id: https://yap.svn.sf.net/svnroot/yap/trunk@392 b08c6af1-5177-4d33-ba66-4b1c6b8b522a
This commit is contained in:
stasinos 2002-02-27 13:41:24 +00:00
parent d5a0f6d9ec
commit c53dc217bb
5 changed files with 173 additions and 81 deletions

4
TO_DO
View File

@ -20,11 +20,7 @@ STK:
- see if on_signal/3 works through the C interface as well.
- current_signal/3 does not know the SigId.
- see how MPI libraries interact with YAP's signal handling.
- make the MPI interface able to pass infinite terms to MPI_Send().
- Make cmd-line arg passing easier (i.e. not require run-script).
- mpi_open/3 is not really doing what it promises: the processes have
already been cloned. This can be a problem if random numbers are
used before calling mpi_open/3.
- atomic (as opposed or to numerical) tags (and communicators).
- configure.in:236: check for mpicc in the user's prefix, if any

View File

@ -19,8 +19,8 @@ calc( From, To, Acc, Res ) :- !,
%% and collects the results.
%%
do(0, Num) :-
!,
do(0, Num):-
integer(Num), !,
Half is Num // 2,
format( 'Proc 0: Calculating ~q..~q~n', [1, Half] ),
calc( 1, Half, 0, R1 ),
@ -30,21 +30,30 @@ do(0, Num) :-
% mpi_receive( R2, 1, 1 ), % Be more particular
Res is R1 + R2,
format( 'Sum(1..~q) = ~q~n', [Num,Res] ).
do(1, Num) :-
!,
do(1, Num):-
integer(Num), !,
Half is Num // 2,
format( 'Proc 1: Calculating ~q..~q~n', [Half,Num] ),
calc( Half, Num, 0, Res ),
format( 'Proc 1: Done! (~q)~n', [Res] ),
mpi_send( Res, 0, 1 ).
do(0, _):-
!,
mpi_receive(T, Source, Tag),
format( '0: Proc ~q said: ~q (Tag: ~q)~n', [Source,T,Tag] ).
do(1, Term):-
!,
mpi_send(Term, 0, 1),
format( "1: I sent ~q (Tag: 1) to 0~n", [Term] ).
%%
%% This is the entry point
%%
start(Num) :-
start(Job) :-
mpi_open( Rank, NumProc, ProcName ),
format( 'Rank: ~q NumProc: ~q, ProcName: ~q~n', [Rank,NumProc,ProcName] ),
do( Rank, Num ),
do( Rank, Job ),
format( 'Rank ~q finished!~n', [Rank] ).

View File

@ -26,22 +26,22 @@ do(0, Num) :-
mpe_create_event(Ev1),
mpe_create_event(Ev2),
format( "Ev1 == ~q, Ev2 == ~q~n", [Ev1,Ev2] ),
mpe_create_state(Ev1,Ev2),
mpe_create_state(Ev1,Ev2,state1,red),
format( "1 AA~n", [] ),
mpe_log(Ev1),
mpe_log(Ev1,0,event1),
format( "2 AA~n", [] ),
mpi_bcast( Num, 0 ),
mpi_bcast( Num, 0, 100 ),
format( 'Proc 0: broadcast ~q.~n', [Num] ),
mpe_log(Ev2).
mpe_log(Ev2,0,event2).
do(Rank, _) :-
!,
mpe_create_event(Ev1),
mpe_create_event(Ev2),
format( "Ev1 == ~q, Ev2 == ~q~n", [Ev1,Ev2] ),
mpe_log(Ev1),
mpi_bcast( Num, 0 ),
mpe_log(Ev1,0,event1),
mpi_bcast( Num, 0, 100 ),
format( 'Proc ~q: had ~q broadcast from 0.~n', [Rank, Num] ),
mpe_log(Ev2).
mpe_log(Ev2,0,event2).
%%
@ -54,4 +54,4 @@ start(Msg) :-
mpe_open,
do(Rank, Msg),
format( 'Rank ~q finished!~n', [Rank] ),
mpe_close.
mpe_close( demo2 ).

View File

@ -9,14 +9,14 @@
**************************************************************************
* *
* File: mpe.c *
* Last rev: $Date: 2002-02-26 15:33:16 $ *
* Last rev: $Date: 2002-02-27 13:41:24 $ *
* mods: *
* comments: Interface to an MPE library *
* *
*************************************************************************/
#ifndef lint
static char *rcsid = "$Header: ";
static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpe.c,v 1.3 2002-02-27 13:41:24 stasinos Exp $";
#endif
#include "Yap.h"
@ -102,48 +102,50 @@ p_create_event()
static Int /* mpe_create_state(+Event,+Event,+Text,+Colour) */
p_create_state()
{
Term t_start = Deref(ARG1), t_end = Deref(ARG2),
t_descr = Deref(ARG3), t_colour = Deref(ARG4);
Int start_id, end_id;
char *descr, *colour;
int retv;
/* The first and second args must be bount to integer event IDs. */
if (IsVarTerm(ARG1)) {
Error(INSTANTIATION_ERROR, ARG1, "mpe_create_state");
if (IsVarTerm(t_start)) {
Error(INSTANTIATION_ERROR, t_start, "mpe_create_state");
return (FALSE);
} else if( !IsIntegerTerm(ARG1) ) {
Error(TYPE_ERROR_INTEGER, ARG1, "mpe_create_state");
} else if( !IsIntegerTerm(t_start) ) {
Error(TYPE_ERROR_INTEGER, t_start, "mpe_create_state");
return (FALSE);
} else {
start_id = IntOfTerm(ARG1);
start_id = IntOfTerm(t_start);
}
if (IsVarTerm(ARG2)) {
Error(INSTANTIATION_ERROR, ARG2, "mpe_create_state");
if (IsVarTerm(t_end)) {
Error(INSTANTIATION_ERROR, t_end, "mpe_create_state");
return (FALSE);
} else if( !IsIntegerTerm(ARG2) ) {
Error(TYPE_ERROR_INTEGER, ARG2, "mpe_create_state");
} else if( !IsIntegerTerm(t_end) ) {
Error(TYPE_ERROR_INTEGER, t_end, "mpe_create_state");
return (FALSE);
} else {
end_id = IntOfTerm(ARG2);
end_id = IntOfTerm(t_end);
}
/* The third and fourth args must be bound to atoms. */
if (IsVarTerm(ARG3)) {
Error(INSTANTIATION_ERROR, ARG3, "mpe_create_state");
if (IsVarTerm(t_descr)) {
Error(INSTANTIATION_ERROR, t_descr, "mpe_create_state");
return (FALSE);
} else if( !IsAtomTerm(ARG3) ) {
Error(TYPE_ERROR_ATOM, ARG3, "mpe_create_state");
} else if( !IsAtomTerm(t_descr) ) {
Error(TYPE_ERROR_ATOM, t_descr, "mpe_create_state");
return (FALSE);
} else {
descr = RepAtom(AtomOfTerm(ARG3))->StrOfAE;
descr = RepAtom(AtomOfTerm(t_descr))->StrOfAE;
}
if (IsVarTerm(ARG4)) {
Error(INSTANTIATION_ERROR, ARG4, "mpe_create_state");
if (IsVarTerm(t_colour)) {
Error(INSTANTIATION_ERROR, t_colour, "mpe_create_state");
return (FALSE);
} else if( !IsAtomTerm(ARG4) ) {
Error(TYPE_ERROR_ATOM, ARG4, "mpe_create_state");
} else if( !IsAtomTerm(t_colour) ) {
Error(TYPE_ERROR_ATOM, t_colour, "mpe_create_state");
return (FALSE);
} else {
colour = RepAtom(AtomOfTerm(ARG4))->StrOfAE;
colour = RepAtom(AtomOfTerm(t_colour))->StrOfAE;
}
retv = MPE_Describe_state( (int)start_id, (int)end_id, descr, colour );
@ -204,8 +206,6 @@ p_log() /* mpe_log(+EventType, +EventNum, +EventStr) */
void
InitMPE(void)
{
MPE_Init_log();
InitCPred( "mpe_open", 0, p_init, SafePredFlag );
InitCPred( "mpe_start", 0, p_start, SafePredFlag );
InitCPred( "mpe_close", 1, p_close, SafePredFlag );

View File

@ -9,14 +9,14 @@
**************************************************************************
* *
* File: mpi.c *
* Last rev: $Date: 2002-02-26 15:34:08 $ *
* Last rev: $Date: 2002-02-27 13:41:24 $ *
* mods: *
* comments: Interface to an MPI library *
* *
*************************************************************************/
#ifndef lint
static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.4 2002-02-26 15:34:08 stasinos Exp $";
static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.5 2002-02-27 13:41:24 stasinos Exp $";
#endif
#include "Yap.h"
@ -48,18 +48,57 @@ STATIC_PROTO (Int p_mpi_barrier, (void));
static Int rank, numprocs, namelen;
static char processor_name[MPI_MAX_PROCESSOR_NAME];
static Int mpi_argc;
static char **mpi_argv;
/* mini-stream */
#define MAX_TERM_SIZE 1024
#define RECV_BUF_SIZE 4*1024
static char buf[MAX_TERM_SIZE + 5];
static int bufptr, buflen;
static size_t bufsize;
static char *buf;
static int bufptr, bufstrlen;
static void
expand_buffer( int space )
{
#if 1
/* realloc has been SIGSEGV'ing on HP-UX 10.20.
do i need to look into arcane allignment issues? */
char *tmp;
tmp = malloc( bufsize + space );
if( tmp == NULL ) {
Error(SYSTEM_ERROR, TermNil, "out of memory" );
exit_yap( EXIT_FAILURE );
}
memcpy( tmp, buf, bufsize );
free( buf );
buf = tmp;
#else
buf = realloc( buf, bufsize + space );
if( buf == NULL ) {
Error(SYSTEM_ERROR, TermNil, "out of memory" );
exit_yap( EXIT_FAILURE );
}
#endif
bufsize += space;
#if 0
printf( "New bufsize: %d\n", bufsize );
#endif
}
static int
mpi_putc(Int stream, Int ch)
{
if( ch > 0 )
if( ch > 0 ) {
if( bufptr >= bufsize ) expand_buffer( RECV_BUF_SIZE );
buf[bufptr++] = ch;
}
return ch;
}
@ -72,9 +111,10 @@ mpi_getc(Int stream)
static Int
mpi_eob(void)
{
return (bufptr<buflen) && (buf[bufptr] != EOF);
return (bufptr<bufstrlen) && (buf[bufptr] != EOF);
}
/* Term parser */
static Term
@ -177,6 +217,11 @@ p_mpi_open(void) /* mpi_open(?rank, ?num_procs, ?proc_name) */
Term t_rank = Deref(ARG1), t_numprocs = Deref(ARG2), t_procname = Deref(ARG3);
Int retv;
MPI_Init( &mpi_argc, &mpi_argv );
MPI_Comm_size( MPI_COMM_WORLD, &numprocs );
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
MPI_Get_processor_name( processor_name, &namelen );
retv = unify(t_rank, MkIntTerm(rank));
retv = retv && unify(t_numprocs, MkIntTerm(numprocs));
retv = retv && unify(t_procname, MkAtomTerm(LookupAtom(processor_name)));
@ -229,13 +274,22 @@ p_mpi_send() /* mpi_send(+data, +destination, +tag) */
bufptr = 0;
/* Turn the term into its ASCII representation */
plwrite( t_data, mpi_putc, 5 );
bufstrlen = bufptr;
bufptr = 0;
/* Careful: the buf is not NULL-terminated and does not have the
trailing ". " required by the parser */
retv = MPI_Send( &buf, bufptr, MPI_CHAR, dest, tag, MPI_COMM_WORLD );
while( bufstrlen-bufptr > 0 ) {
int n;
if( retv == 0 ) return TRUE;
else return FALSE;
n = (bufstrlen-bufptr < RECV_BUF_SIZE)? (bufstrlen-bufptr) : RECV_BUF_SIZE;
/* Careful: the buf is not NULL-terminated and does not have the
trailing ". " required by the parser */
retv = MPI_Send( &buf[bufptr], n, MPI_CHAR, dest, tag, MPI_COMM_WORLD );
if( retv != 0 ) return FALSE;
bufptr += n;
}
return TRUE;
}
@ -274,30 +328,48 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */
} else
tag = IntOfTerm( t_tag );
/* Receive the message as a C string */
retv = MPI_Recv( buf, MAX_TERM_SIZE, MPI_CHAR, orig, tag, MPI_COMM_WORLD, &status );
if( retv != 0 ) return FALSE;
bufptr = 0;
while(TRUE) {
int n;
/* Receive the message as a C string */
retv = MPI_Recv( &buf[bufptr], RECV_BUF_SIZE, MPI_CHAR, orig, tag,
MPI_COMM_WORLD, &status );
if( retv != 0 ) return FALSE;
MPI_Get_count( &status, MPI_CHAR, &n );
bufptr += n;
if( n == RECV_BUF_SIZE ) {
/* if not enough space, expand buffer */
if( bufsize - bufptr <= RECV_BUF_SIZE ) expand_buffer(RECV_BUF_SIZE);
}
else {
/* we have gotten everything */
break;
}
}
if( bufsize - bufptr <= 3 ) expand_buffer(3);
/* NULL-terminate the string and add the ". " termination
required by the parser. */
buf[bufptr] = 0;
strcat( buf, ". " );
bufstrlen = bufptr + 2;
bufptr = 0;
if( orig == MPI_ANY_SOURCE ) unify(t_orig, MkIntTerm(status.MPI_SOURCE));
if( tag == MPI_ANY_TAG ) unify(t_tag, MkIntTerm(status.MPI_TAG));
/* NULL-terminate the string and add the ". " termination
required by the parser. */
MPI_Get_count( &status, MPI_CHAR, &buflen );
buf[buflen] = 0;
strcat( buf, ". " );
buflen += 2;
bufptr = 0;
/* parse received string into a Prolog term */
return unify(mpi_parse(), ARG1);
return unify(ARG1, mpi_parse());
}
static Int
p_mpi_bcast() /* mpi_bcast( ?data, +root ) */
p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */
{
Term t_data = Deref(ARG1), t_root = Deref(ARG2);
int root, retv;
Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3);
int root, retv, max_size;
/* The second argument must be bound to an integer (the rank of
root processor */
@ -322,8 +394,29 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root ) */
buf[bufptr] = 0;
strcat( buf, ". " );
}
bufstrlen = bufptr + 2;
retv = MPI_Bcast( buf, MAX_TERM_SIZE, MPI_CHAR, root, MPI_COMM_WORLD );
/* The third argument must be bound to an integer (the maximum length
of the broadcast term's ASCII representation */
if (IsVarTerm(t_max_size)) {
Error(INSTANTIATION_ERROR, t_max_size, "mpi_bcast");
return FALSE;
}
/* allow for the ". " bit and the NULL at the end */
max_size = IntOfTerm( t_max_size ) + 3;
#if 0
if( max_size < bufstrlen )
/* issue a warning? explode? bcast s'thing unparsable? */
#endif
/* adjust the buffer size, if necessary */
if( max_size > bufsize ) {
printf("expanding by %d\n", max_size-bufsize);
expand_buffer( max_size - bufsize );
}
retv = MPI_Bcast( buf, max_size, MPI_CHAR, root, MPI_COMM_WORLD );
if( retv != 0 ) return FALSE;
if( root == rank ) return TRUE;
@ -334,7 +427,7 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root ) */
return FALSE;
}
buflen = strlen(buf);
bufstrlen = strlen(buf);
bufptr = 0;
/* parse received string into a Prolog term */
@ -350,7 +443,6 @@ p_mpi_barrier() /* mpi_barrier/0 */
retv = MPI_Barrier( MPI_COMM_WORLD );
printf( "MPI_Barrier() returns %d\n", retv );
return (retv == 0);
}
@ -365,12 +457,13 @@ void
InitMPI(void)
{
int i,j;
Int mpi_argc;
char **mpi_argv;
mpi_argv = malloc( yap_argc * sizeof(char *) );
mpi_argv[0] = strdup( yap_args[0] );
bufsize = RECV_BUF_SIZE;
buf = malloc(bufsize * sizeof(char));
for( i=1; i<yap_argc; ++i ) {
if( !strcmp(yap_args[i], "--") ) { ++i; break; }
}
@ -397,17 +490,11 @@ InitMPI(void)
}
#endif
MPI_Init( &mpi_argc, &mpi_argv );
/* MPI_Init( &yap_argc, &yap_args ); */
MPI_Comm_size( MPI_COMM_WORLD, &numprocs );
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
MPI_Get_processor_name( processor_name, &namelen );
InitCPred( "mpi_open", 3, p_mpi_open, /*SafePredFlag|SyncPredFlag*/ 0 );
InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag );
InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag );
InitCPred( "mpi_receive", 3, p_mpi_receive, SyncPredFlag );
InitCPred( "mpi_bcast", 2, p_mpi_bcast, SyncPredFlag );
InitCPred( "mpi_bcast", 3, p_mpi_bcast, SyncPredFlag );
InitCPred( "mpi_barrier", 0, p_mpi_barrier, 0 );
}