From c53dc217bb2455bbe25034581af98c62da0ab845 Mon Sep 17 00:00:00 2001 From: stasinos Date: Wed, 27 Feb 2002 13:41:24 +0000 Subject: [PATCH] remove the compile-time term size limit. do not clone before mpi_open/3 is invoked. git-svn-id: https://yap.svn.sf.net/svnroot/yap/trunk@392 b08c6af1-5177-4d33-ba66-4b1c6b8b522a --- TO_DO | 4 - library/mpi/examples/demo1.pl | 21 +++-- library/mpi/examples/demo2.pl | 16 ++-- library/mpi/mpe.c | 48 +++++----- library/mpi/mpi.c | 165 ++++++++++++++++++++++++++-------- 5 files changed, 173 insertions(+), 81 deletions(-) diff --git a/TO_DO b/TO_DO index dfe7b0a91..2937df00c 100644 --- a/TO_DO +++ b/TO_DO @@ -20,11 +20,7 @@ STK: - see if on_signal/3 works through the C interface as well. - current_signal/3 does not know the SigId. - see how MPI libraries interact with YAP's signal handling. -- make the MPI interface able to pass infinite terms to MPI_Send(). - Make cmd-line arg passing easier (i.e. not require run-script). -- mpi_open/3 is not really doing what it promises: the processes have - already been cloned. This can be a problem if random numbers are - used before calling mpi_open/3. - atomic (as opposed or to numerical) tags (and communicators). - configure.in:236: check for mpicc in the user's prefix, if any diff --git a/library/mpi/examples/demo1.pl b/library/mpi/examples/demo1.pl index fa7d90136..19fe22d66 100644 --- a/library/mpi/examples/demo1.pl +++ b/library/mpi/examples/demo1.pl @@ -19,8 +19,8 @@ calc( From, To, Acc, Res ) :- !, %% and collects the results. %% -do(0, Num) :- - !, +do(0, Num):- + integer(Num), !, Half is Num // 2, format( 'Proc 0: Calculating ~q..~q~n', [1, Half] ), calc( 1, Half, 0, R1 ), @@ -30,21 +30,30 @@ do(0, Num) :- % mpi_receive( R2, 1, 1 ), % Be more particular Res is R1 + R2, format( 'Sum(1..~q) = ~q~n', [Num,Res] ). -do(1, Num) :- - !, +do(1, Num):- + integer(Num), !, Half is Num // 2, format( 'Proc 1: Calculating ~q..~q~n', [Half,Num] ), calc( Half, Num, 0, Res ), format( 'Proc 1: Done! (~q)~n', [Res] ), mpi_send( Res, 0, 1 ). +do(0, _):- + !, + mpi_receive(T, Source, Tag), + format( '0: Proc ~q said: ~q (Tag: ~q)~n', [Source,T,Tag] ). +do(1, Term):- + !, + mpi_send(Term, 0, 1), + format( "1: I sent ~q (Tag: 1) to 0~n", [Term] ). + %% %% This is the entry point %% -start(Num) :- +start(Job) :- mpi_open( Rank, NumProc, ProcName ), format( 'Rank: ~q NumProc: ~q, ProcName: ~q~n', [Rank,NumProc,ProcName] ), - do( Rank, Num ), + do( Rank, Job ), format( 'Rank ~q finished!~n', [Rank] ). diff --git a/library/mpi/examples/demo2.pl b/library/mpi/examples/demo2.pl index 12222da5b..a3272c5de 100644 --- a/library/mpi/examples/demo2.pl +++ b/library/mpi/examples/demo2.pl @@ -26,22 +26,22 @@ do(0, Num) :- mpe_create_event(Ev1), mpe_create_event(Ev2), format( "Ev1 == ~q, Ev2 == ~q~n", [Ev1,Ev2] ), - mpe_create_state(Ev1,Ev2), + mpe_create_state(Ev1,Ev2,state1,red), format( "1 AA~n", [] ), - mpe_log(Ev1), + mpe_log(Ev1,0,event1), format( "2 AA~n", [] ), - mpi_bcast( Num, 0 ), + mpi_bcast( Num, 0, 100 ), format( 'Proc 0: broadcast ~q.~n', [Num] ), - mpe_log(Ev2). + mpe_log(Ev2,0,event2). do(Rank, _) :- !, mpe_create_event(Ev1), mpe_create_event(Ev2), format( "Ev1 == ~q, Ev2 == ~q~n", [Ev1,Ev2] ), - mpe_log(Ev1), - mpi_bcast( Num, 0 ), + mpe_log(Ev1,0,event1), + mpi_bcast( Num, 0, 100 ), format( 'Proc ~q: had ~q broadcast from 0.~n', [Rank, Num] ), - mpe_log(Ev2). + mpe_log(Ev2,0,event2). %% @@ -54,4 +54,4 @@ start(Msg) :- mpe_open, do(Rank, Msg), format( 'Rank ~q finished!~n', [Rank] ), - mpe_close. + mpe_close( demo2 ). diff --git a/library/mpi/mpe.c b/library/mpi/mpe.c index f367cbe3d..44ebcdc0b 100644 --- a/library/mpi/mpe.c +++ b/library/mpi/mpe.c @@ -9,14 +9,14 @@ ************************************************************************** * * * File: mpe.c * -* Last rev: $Date: 2002-02-26 15:33:16 $ * +* Last rev: $Date: 2002-02-27 13:41:24 $ * * mods: * * comments: Interface to an MPE library * * * *************************************************************************/ #ifndef lint -static char *rcsid = "$Header: "; +static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpe.c,v 1.3 2002-02-27 13:41:24 stasinos Exp $"; #endif #include "Yap.h" @@ -102,48 +102,50 @@ p_create_event() static Int /* mpe_create_state(+Event,+Event,+Text,+Colour) */ p_create_state() { + Term t_start = Deref(ARG1), t_end = Deref(ARG2), + t_descr = Deref(ARG3), t_colour = Deref(ARG4); Int start_id, end_id; char *descr, *colour; int retv; /* The first and second args must be bount to integer event IDs. */ - if (IsVarTerm(ARG1)) { - Error(INSTANTIATION_ERROR, ARG1, "mpe_create_state"); + if (IsVarTerm(t_start)) { + Error(INSTANTIATION_ERROR, t_start, "mpe_create_state"); return (FALSE); - } else if( !IsIntegerTerm(ARG1) ) { - Error(TYPE_ERROR_INTEGER, ARG1, "mpe_create_state"); + } else if( !IsIntegerTerm(t_start) ) { + Error(TYPE_ERROR_INTEGER, t_start, "mpe_create_state"); return (FALSE); } else { - start_id = IntOfTerm(ARG1); + start_id = IntOfTerm(t_start); } - if (IsVarTerm(ARG2)) { - Error(INSTANTIATION_ERROR, ARG2, "mpe_create_state"); + if (IsVarTerm(t_end)) { + Error(INSTANTIATION_ERROR, t_end, "mpe_create_state"); return (FALSE); - } else if( !IsIntegerTerm(ARG2) ) { - Error(TYPE_ERROR_INTEGER, ARG2, "mpe_create_state"); + } else if( !IsIntegerTerm(t_end) ) { + Error(TYPE_ERROR_INTEGER, t_end, "mpe_create_state"); return (FALSE); } else { - end_id = IntOfTerm(ARG2); + end_id = IntOfTerm(t_end); } /* The third and fourth args must be bound to atoms. */ - if (IsVarTerm(ARG3)) { - Error(INSTANTIATION_ERROR, ARG3, "mpe_create_state"); + if (IsVarTerm(t_descr)) { + Error(INSTANTIATION_ERROR, t_descr, "mpe_create_state"); return (FALSE); - } else if( !IsAtomTerm(ARG3) ) { - Error(TYPE_ERROR_ATOM, ARG3, "mpe_create_state"); + } else if( !IsAtomTerm(t_descr) ) { + Error(TYPE_ERROR_ATOM, t_descr, "mpe_create_state"); return (FALSE); } else { - descr = RepAtom(AtomOfTerm(ARG3))->StrOfAE; + descr = RepAtom(AtomOfTerm(t_descr))->StrOfAE; } - if (IsVarTerm(ARG4)) { - Error(INSTANTIATION_ERROR, ARG4, "mpe_create_state"); + if (IsVarTerm(t_colour)) { + Error(INSTANTIATION_ERROR, t_colour, "mpe_create_state"); return (FALSE); - } else if( !IsAtomTerm(ARG4) ) { - Error(TYPE_ERROR_ATOM, ARG4, "mpe_create_state"); + } else if( !IsAtomTerm(t_colour) ) { + Error(TYPE_ERROR_ATOM, t_colour, "mpe_create_state"); return (FALSE); } else { - colour = RepAtom(AtomOfTerm(ARG4))->StrOfAE; + colour = RepAtom(AtomOfTerm(t_colour))->StrOfAE; } retv = MPE_Describe_state( (int)start_id, (int)end_id, descr, colour ); @@ -204,8 +206,6 @@ p_log() /* mpe_log(+EventType, +EventNum, +EventStr) */ void InitMPE(void) { - MPE_Init_log(); - InitCPred( "mpe_open", 0, p_init, SafePredFlag ); InitCPred( "mpe_start", 0, p_start, SafePredFlag ); InitCPred( "mpe_close", 1, p_close, SafePredFlag ); diff --git a/library/mpi/mpi.c b/library/mpi/mpi.c index 468eb3022..ac5bc173b 100644 --- a/library/mpi/mpi.c +++ b/library/mpi/mpi.c @@ -9,14 +9,14 @@ ************************************************************************** * * * File: mpi.c * -* Last rev: $Date: 2002-02-26 15:34:08 $ * +* Last rev: $Date: 2002-02-27 13:41:24 $ * * mods: * * comments: Interface to an MPI library * * * *************************************************************************/ #ifndef lint -static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.4 2002-02-26 15:34:08 stasinos Exp $"; +static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.5 2002-02-27 13:41:24 stasinos Exp $"; #endif #include "Yap.h" @@ -48,18 +48,57 @@ STATIC_PROTO (Int p_mpi_barrier, (void)); static Int rank, numprocs, namelen; static char processor_name[MPI_MAX_PROCESSOR_NAME]; +static Int mpi_argc; +static char **mpi_argv; + /* mini-stream */ -#define MAX_TERM_SIZE 1024 +#define RECV_BUF_SIZE 4*1024 -static char buf[MAX_TERM_SIZE + 5]; -static int bufptr, buflen; +static size_t bufsize; +static char *buf; +static int bufptr, bufstrlen; + +static void +expand_buffer( int space ) +{ + +#if 1 + /* realloc has been SIGSEGV'ing on HP-UX 10.20. + do i need to look into arcane allignment issues? */ + + char *tmp; + + tmp = malloc( bufsize + space ); + if( tmp == NULL ) { + Error(SYSTEM_ERROR, TermNil, "out of memory" ); + exit_yap( EXIT_FAILURE ); + } + memcpy( tmp, buf, bufsize ); + free( buf ); + buf = tmp; +#else + buf = realloc( buf, bufsize + space ); + if( buf == NULL ) { + Error(SYSTEM_ERROR, TermNil, "out of memory" ); + exit_yap( EXIT_FAILURE ); + } +#endif + + bufsize += space; + +#if 0 + printf( "New bufsize: %d\n", bufsize ); +#endif +} static int mpi_putc(Int stream, Int ch) { - if( ch > 0 ) + if( ch > 0 ) { + if( bufptr >= bufsize ) expand_buffer( RECV_BUF_SIZE ); buf[bufptr++] = ch; + } return ch; } @@ -72,9 +111,10 @@ mpi_getc(Int stream) static Int mpi_eob(void) { - return (bufptr 0 ) { + int n; - if( retv == 0 ) return TRUE; - else return FALSE; + n = (bufstrlen-bufptr < RECV_BUF_SIZE)? (bufstrlen-bufptr) : RECV_BUF_SIZE; + + /* Careful: the buf is not NULL-terminated and does not have the + trailing ". " required by the parser */ + retv = MPI_Send( &buf[bufptr], n, MPI_CHAR, dest, tag, MPI_COMM_WORLD ); + if( retv != 0 ) return FALSE; + bufptr += n; + } + + return TRUE; } @@ -274,30 +328,48 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */ } else tag = IntOfTerm( t_tag ); - /* Receive the message as a C string */ - retv = MPI_Recv( buf, MAX_TERM_SIZE, MPI_CHAR, orig, tag, MPI_COMM_WORLD, &status ); - if( retv != 0 ) return FALSE; + bufptr = 0; + while(TRUE) { + int n; + + /* Receive the message as a C string */ + retv = MPI_Recv( &buf[bufptr], RECV_BUF_SIZE, MPI_CHAR, orig, tag, + MPI_COMM_WORLD, &status ); + if( retv != 0 ) return FALSE; + MPI_Get_count( &status, MPI_CHAR, &n ); + bufptr += n; + + if( n == RECV_BUF_SIZE ) { + /* if not enough space, expand buffer */ + if( bufsize - bufptr <= RECV_BUF_SIZE ) expand_buffer(RECV_BUF_SIZE); + } + else { + /* we have gotten everything */ + break; + } + } + + if( bufsize - bufptr <= 3 ) expand_buffer(3); + /* NULL-terminate the string and add the ". " termination + required by the parser. */ + buf[bufptr] = 0; + strcat( buf, ". " ); + bufstrlen = bufptr + 2; + bufptr = 0; + if( orig == MPI_ANY_SOURCE ) unify(t_orig, MkIntTerm(status.MPI_SOURCE)); if( tag == MPI_ANY_TAG ) unify(t_tag, MkIntTerm(status.MPI_TAG)); - /* NULL-terminate the string and add the ". " termination - required by the parser. */ - MPI_Get_count( &status, MPI_CHAR, &buflen ); - buf[buflen] = 0; - strcat( buf, ". " ); - buflen += 2; - bufptr = 0; - /* parse received string into a Prolog term */ - return unify(mpi_parse(), ARG1); + return unify(ARG1, mpi_parse()); } static Int -p_mpi_bcast() /* mpi_bcast( ?data, +root ) */ +p_mpi_bcast() /* mpi_bcast( ?data, +root, +max_size ) */ { - Term t_data = Deref(ARG1), t_root = Deref(ARG2); - int root, retv; + Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3); + int root, retv, max_size; /* The second argument must be bound to an integer (the rank of root processor */ @@ -322,8 +394,29 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root ) */ buf[bufptr] = 0; strcat( buf, ". " ); } + bufstrlen = bufptr + 2; - retv = MPI_Bcast( buf, MAX_TERM_SIZE, MPI_CHAR, root, MPI_COMM_WORLD ); + /* The third argument must be bound to an integer (the maximum length + of the broadcast term's ASCII representation */ + if (IsVarTerm(t_max_size)) { + Error(INSTANTIATION_ERROR, t_max_size, "mpi_bcast"); + return FALSE; + } + /* allow for the ". " bit and the NULL at the end */ + max_size = IntOfTerm( t_max_size ) + 3; + +#if 0 + if( max_size < bufstrlen ) + /* issue a warning? explode? bcast s'thing unparsable? */ +#endif + + /* adjust the buffer size, if necessary */ + if( max_size > bufsize ) { + printf("expanding by %d\n", max_size-bufsize); + expand_buffer( max_size - bufsize ); + } + + retv = MPI_Bcast( buf, max_size, MPI_CHAR, root, MPI_COMM_WORLD ); if( retv != 0 ) return FALSE; if( root == rank ) return TRUE; @@ -334,7 +427,7 @@ p_mpi_bcast() /* mpi_bcast( ?data, +root ) */ return FALSE; } - buflen = strlen(buf); + bufstrlen = strlen(buf); bufptr = 0; /* parse received string into a Prolog term */ @@ -350,7 +443,6 @@ p_mpi_barrier() /* mpi_barrier/0 */ retv = MPI_Barrier( MPI_COMM_WORLD ); - printf( "MPI_Barrier() returns %d\n", retv ); return (retv == 0); } @@ -365,12 +457,13 @@ void InitMPI(void) { int i,j; - Int mpi_argc; - char **mpi_argv; mpi_argv = malloc( yap_argc * sizeof(char *) ); mpi_argv[0] = strdup( yap_args[0] ); + bufsize = RECV_BUF_SIZE; + buf = malloc(bufsize * sizeof(char)); + for( i=1; i