diff --git a/library/mpi/examples/demo2.pl b/library/mpi/examples/demo2.pl index a3272c5de..825bbf97f 100644 --- a/library/mpi/examples/demo2.pl +++ b/library/mpi/examples/demo2.pl @@ -30,7 +30,7 @@ do(0, Num) :- format( "1 AA~n", [] ), mpe_log(Ev1,0,event1), format( "2 AA~n", [] ), - mpi_bcast( Num, 0, 100 ), + mpi_bcast( Num, 0 ), format( 'Proc 0: broadcast ~q.~n', [Num] ), mpe_log(Ev2,0,event2). do(Rank, _) :- @@ -39,7 +39,7 @@ do(Rank, _) :- mpe_create_event(Ev2), format( "Ev1 == ~q, Ev2 == ~q~n", [Ev1,Ev2] ), mpe_log(Ev1,0,event1), - mpi_bcast( Num, 0, 100 ), + mpi_bcast( Num, 0 ), format( 'Proc ~q: had ~q broadcast from 0.~n', [Rank, Num] ), mpe_log(Ev2,0,event2). @@ -51,7 +51,13 @@ do(Rank, _) :- start(Msg) :- mpi_open( Rank, NumProc, ProcName ), format( 'Rank: ~q NumProc: ~q, ProcName: ~q~n', [Rank,NumProc,ProcName] ), - mpe_open, + (mpe_open -> true ; + assert_static(mpe_create_event(dummy)), + assert_static(mpe_create_state(_,_,_,_)), + assert_static(mpe_log(_,_,_)), + assert_static(mpe_close(_)) + ), do(Rank, Msg), format( 'Rank ~q finished!~n', [Rank] ), mpe_close( demo2 ). + diff --git a/library/mpi/mpi.c b/library/mpi/mpi.c index 99dbf0857..00d0f0953 100644 --- a/library/mpi/mpi.c +++ b/library/mpi/mpi.c @@ -9,14 +9,14 @@ ************************************************************************** * * * File: mpi.c * -* Last rev: $Date: 2002-03-13 09:01:39 $ * +* Last rev: $Date: 2002-10-03 17:28:37 $ * * mods: * * comments: Interface to an MPI library * * * *************************************************************************/ #ifndef lint -static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.7 2002-03-13 09:01:39 stasinos Exp $"; +static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.8 2002-10-03 17:28:37 stasinos Exp $"; #endif #include "Yap.h" @@ -49,12 +49,15 @@ STATIC_PROTO (Int p_mpi_barrier, (void)); static Int rank, numprocs, namelen; static char processor_name[MPI_MAX_PROCESSOR_NAME]; +/* used by the parser */ +static int StartLine; + static Int mpi_argc; static char **mpi_argv; /* mini-stream */ -#define RECV_BUF_SIZE 4*1024 +#define RECV_BUF_SIZE 1024*32 static size_t bufsize, bufstrlen; static char *buf; @@ -63,17 +66,16 @@ static int bufptr; static void expand_buffer( int space ) { - -#if 1 - /* realloc has been SIGSEGV'ing on HP-UX 10.20. - do i need to look into arcane allignment issues? */ +#if MPI_AVOID_REALLOC + /* + realloc() has been SIGSEGV'ing on HP-UX 10.20, but there is + no problem in HP-UX 11.0. We can remove this bit here as soon + as Yap stops compiling on 10.20 anyway. If removed, also remove + the MPI_AVOID_REALLOC bits from configure.in and config.h.in + */ char *tmp; -#if 0 - printf( "expanding by %d...", space ); -#endif - tmp = malloc( bufsize + space ); if( tmp == NULL ) { Error(SYSTEM_ERROR, TermNil, "out of memory" ); @@ -82,25 +84,23 @@ expand_buffer( int space ) memcpy( tmp, buf, bufsize ); free( buf ); buf = tmp; -#else +#else /* use realloc */ buf = realloc( buf, bufsize + space ); if( buf == NULL ) { - Error(SYSTEM_ERROR, TermNil, "out of memory" ); + Error(SYSTEM_ERROR, TermNil, "out of memory"); exit_yap( EXIT_FAILURE ); } #endif bufsize += space; - -#if 0 - printf("SUCCESS\n"); - printf( "New bufsize: %d\n", bufsize ); -#endif } static int mpi_putc(Int stream, Int ch) { +#if 0 + printf("%d: PUTC %d a.k.a. %c at %d\n", rank, ch, (char)ch, bufptr); +#endif if( ch > 0 ) { if( bufptr >= bufsize ) expand_buffer( RECV_BUF_SIZE ); buf[bufptr++] = ch; @@ -111,6 +111,9 @@ mpi_putc(Int stream, Int ch) static Int mpi_getc(Int stream) { +#if 0 + printf("%d: GETC %c at %d\n", rank, buf[bufptr], bufptr); +#endif return buf[bufptr++]; } @@ -123,6 +126,112 @@ mpi_eob(void) /* Term parser */ +static void +clean_vars(VarEntry *p) +{ + if (p == NULL) return; + p->VarAdr = TermNil; + clean_vars(p->VarLeft); + clean_vars(p->VarRight); +} + +static Term +syntax_error (TokEntry * tokptr) +{ + Term info; + int count = 0, out = 0; + Int start, err = 0, end; + Term tf[6]; + Term *error = tf+3; + CELL *Hi = H; + + start = tokptr->TokPos; + clean_vars(VarTable); + clean_vars(AnonVarTable); + while (1) { + Term ts[2]; + + if (H > ASP-1024) { + H = Hi; + tf[3] = TermNil; + err = 0; + end = 0; + break; + } + if (tokptr == toktide) { + err = tokptr->TokPos; + out = count; + } + info = tokptr->TokInfo; + switch (tokptr->Tok) { + case Name_tok: + { + Term t0 = MkAtomTerm((Atom)info); + ts[0] = MkApplTerm(MkFunctor(LookupAtom("atom"),1),1,&t0); + } + break; + case Number_tok: + ts[0] = MkApplTerm(MkFunctor(LookupAtom("number"),1),1,&(tokptr->TokInfo)); + break; + case Var_tok: + { + Term t[3]; + VarEntry *varinfo = (VarEntry *)info; + + t[0] = MkIntTerm(0); + t[1] = StringToList(varinfo->VarRep); + if (varinfo->VarAdr == TermNil) { + t[2] = varinfo->VarAdr = MkVarTerm(); + } else { + t[2] = varinfo->VarAdr; + } + ts[0] = MkApplTerm(MkFunctor(LookupAtom("var"),3),3,t); + } + break; + case String_tok: + { + Term t0 = StringToList((char *)info); + ts[0] = MkApplTerm(MkFunctor(LookupAtom("string"),1),1,&t0); + } + break; + case Ponctuation_tok: + { + char s[2]; + s[1] = '\0'; + if (Ord (info) == 'l') { + s[0] = '('; + } else { + s[0] = (char)info; + } + ts[0] = MkAtomTerm(LookupAtom(s)); + } + } + if (tokptr->Tok == Ord (eot_tok)) { + *error = TermNil; + end = tokptr->TokPos; + break; + } + ts[1] = MkIntegerTerm(tokptr->TokPos); + *error = + MkPairTerm(MkApplTerm(MkFunctor(LookupAtom("-"),2),2,ts),TermNil); + error = RepPair(*error)+1; + count++; + tokptr = tokptr->TokNext; + } + tf[0] = MkApplTerm(MkFunctor(LookupAtom("read"),1),1,&ARG2); + { + Term t[3]; + t[0] = MkIntegerTerm(start); + t[1] = MkIntegerTerm(err); + t[2] = MkIntegerTerm(end); + tf[1] = MkApplTerm(MkFunctor(LookupAtom("between"),3),3,t); + } + tf[2] = MkAtomTerm(LookupAtom("\n<==== HERE ====>\n")); + tf[4] = MkIntegerTerm(out); + tf[5] = MkIntegerTerm(err); + return(MkApplTerm(MkFunctor(LookupAtom("syntax_error"),6),6,tf)); +} + static Term mpi_parse(void) { @@ -176,14 +285,29 @@ mpi_parse(void) } } TR = old_TR; - if (ErrorMessage) - YP_fprintf (YP_stderr, "%s", ErrorMessage); - else - syntax_error (tokstart); - YP_fprintf (YP_stderr, " ]\n"); - - Error(SYSTEM_ERROR,TermNil,NULL); - return TermNil; + if (ParserErrorStyle == QUIET_ON_PARSER_ERROR) { + /* just fail */ + return(FALSE); + } else if (ParserErrorStyle == CONTINUE_ON_PARSER_ERROR) { + ErrorMessage = NULL; + /* try again */ + goto repeat_cycle; + } else { + Term terr = syntax_error(tokstart); + if (ErrorMessage == NULL) + ErrorMessage = "SYNTAX ERROR"; + + if (ParserErrorStyle == EXCEPTION_ON_PARSER_ERROR) { + Error(SYNTAX_ERROR,terr,ErrorMessage); + return(FALSE); + } else /* FAIL ON PARSER ERROR */ { + Term t[2]; + t[0] = terr; + t[1] = MkAtomTerm(LookupAtom(ErrorMessage)); + return(unify(MkIntTerm(StartLine = tokstart->TokPos),ARG4) && + unify(ARG5,MkApplTerm(MkFunctor(LookupAtom("error"),2),2,t))); + } + } } else { /* parsing succeeded */ @@ -279,20 +403,24 @@ p_mpi_send() /* mpi_send(+data, +destination, +tag) */ bufptr = 0; /* Turn the term into its ASCII representation */ plwrite( t_data, mpi_putc, 5 ); - bufstrlen = bufptr; + bufstrlen = (size_t)bufptr; + + /* The buf is not NULL-terminated and does not have the + trailing ". " required by the parser */ + mpi_putc( 0, '.' ); + mpi_putc( 0, ' ' ); + + buf[bufptr] = 0; + bufstrlen = bufptr + 1; bufptr = 0; - while( bufstrlen-bufptr > 0 ) { - int n; + /* first send the size */ + retv = MPI_Send( &bufstrlen, 1, MPI_INT, dest, tag, MPI_COMM_WORLD ); + if( retv != MPI_SUCCESS ) return FALSE; - n = (bufstrlen-bufptr < RECV_BUF_SIZE)? (bufstrlen-bufptr) : RECV_BUF_SIZE; - - /* Careful: the buf is not NULL-terminated and does not have the - trailing ". " required by the parser */ - retv = MPI_Send( &buf[bufptr], n, MPI_CHAR, dest, tag, MPI_COMM_WORLD ); - if( retv != 0 ) return FALSE; - bufptr += n; - } + /* and then the data */ + retv = MPI_Send( &buf[bufptr], bufstrlen, MPI_CHAR, dest, tag, MPI_COMM_WORLD ); + if( retv != MPI_SUCCESS ) return FALSE; return TRUE; } @@ -333,40 +461,59 @@ p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */ } else tag = IntOfTerm( t_tag ); - bufptr = 0; - while(TRUE) { - int n; - - /* Receive the message as a C string */ - retv = MPI_Recv( &buf[bufptr], RECV_BUF_SIZE, MPI_CHAR, orig, tag, - MPI_COMM_WORLD, &status ); - if( retv != 0 ) return FALSE; - MPI_Get_count( &status, MPI_CHAR, &n ); - bufptr += n; - - if( n == RECV_BUF_SIZE ) { - /* if not enough space, expand buffer */ - if( bufsize - bufptr <= RECV_BUF_SIZE ) expand_buffer(RECV_BUF_SIZE); - } - else { - /* we have gotten everything */ - break; - } + /* receive the size of the term */ + retv = MPI_Recv( &bufstrlen, 1, MPI_INT, orig, tag, + MPI_COMM_WORLD, &status ); + if( retv != MPI_SUCCESS ) { + printf("BOOOOOOOM! retv == %d\n", retv); + return FALSE; } - if( bufsize - bufptr <= 3 ) expand_buffer(3); - /* NULL-terminate the string and add the ". " termination - required by the parser. */ - buf[bufptr] = 0; - strcat( buf, ". " ); - bufstrlen = bufptr + 2; - bufptr = 0; +#if 1 + printf("About to receive %d chars from %d\n", bufstrlen, orig); +#endif - if( orig == MPI_ANY_SOURCE ) unify(t_orig, MkIntTerm(status.MPI_SOURCE)); - if( tag == MPI_ANY_TAG ) unify(t_tag, MkIntTerm(status.MPI_TAG)); + /* adjust the buffer */ + if( bufsize < bufstrlen ) expand_buffer(bufstrlen-bufsize); + + /* Only the first packet can be from MPI_ANY_SOURCE */ + if( orig == MPI_ANY_SOURCE ) { + orig = status.MPI_SOURCE; + retv = unify(t_orig, MkIntTerm(orig)); + if( retv == FALSE ) puts( "PROBLEM1" ); + } + + /* Only the first packet can be of MPI_ANY_TAG */ + if( tag == MPI_ANY_TAG ) { + tag = status.MPI_TAG; + retv = unify(t_tag, MkIntTerm(status.MPI_TAG)); + if( retv == FALSE ) puts( "PROBLEM2" ); + } + + /* Receive the message as a C string */ + retv = MPI_Recv( &buf[bufptr], bufstrlen, MPI_CHAR, orig, tag, + MPI_COMM_WORLD, &status ); + if( retv != MPI_SUCCESS ) { + printf("BOOOOOOOM! retv == %d\n", retv); + return FALSE; + } + +#if 1 + { + int aa; + MPI_Get_count( &status, MPI_CHAR, &aa ); + printf("Received %d chars from %d\n", aa, orig); + } +#endif /* parse received string into a Prolog term */ - return unify(ARG1, mpi_parse()); + bufptr = 0; + retv = unify(t_data, mpi_parse()); +#if 1 + printf("mpi_receive: t_data == %d, retv == %d\n", t_data, retv); +#endif + + return retv; } @@ -451,9 +598,10 @@ p_mpi_bcast3() /* mpi_bcast( ?data, +root, +max_size ) */ } -/* This is the same as above, but for dynamic data size. - It is implemented as two broadcasts, the first being the size - and the second the actual data. +/* + This is the same as above, but for dynamic data size. + It is implemented as two broadcasts, the first being the size + and the second the actual data. */ static Int p_mpi_bcast2() /* mpi_bcast( ?data, +root ) */ @@ -469,6 +617,7 @@ p_mpi_bcast2() /* mpi_bcast( ?data, +root ) */ } root = IntOfTerm( t_root ); + /* If this is the root processor, then the first argument must be bound to the term to be sent. */ if( root == rank ) { @@ -478,27 +627,45 @@ p_mpi_bcast2() /* mpi_bcast( ?data, +root ) */ } bufptr = 0; /* Turn the term into its ASCII representation */ +puts("1"); plwrite( t_data, mpi_putc, 5 ); /* NULL-terminate the string and add the ". " termination required by the parser. */ +puts("1"); buf[bufptr] = 0; strcat( buf, ". " ); bufstrlen = bufptr + 2; } + /* Otherwise, it must a variable */ + else { + if( !IsVarTerm(t_data) ) { + Error(INSTANTIATION_ERROR, t_data, "mpi_bcast"); + return FALSE; + } + } + /* Broadcast the data size */ retv = MPI_Bcast( &bufstrlen, sizeof bufstrlen, MPI_INT, root, MPI_COMM_WORLD ); - if( retv != 0 ) return FALSE; + if( retv != MPI_SUCCESS ) { + printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__); + return FALSE; + } /* adjust the buffer size, if necessary */ if( bufstrlen > bufsize ) { printf("expanding by %d\n", (bufstrlen-bufsize) ); expand_buffer( bufstrlen - bufsize ); } - + else { + printf("bufstrlen: %d, bufsize %d: not expanding\n",bufstrlen,bufsize); + } /* Broadcast the data */ retv = MPI_Bcast( buf, bufstrlen, MPI_CHAR, root, MPI_COMM_WORLD ); - if( retv != 0 ) return FALSE; + if( retv != MPI_SUCCESS ) { + printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__); + return FALSE; + } if( root == rank ) return TRUE; else { @@ -571,13 +738,13 @@ InitMPI(void) } #endif - InitCPred( "mpi_open", 3, p_mpi_open, /*SafePredFlag|SyncPredFlag*/ 0 ); - InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag ); - InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag ); - InitCPred( "mpi_receive", 3, p_mpi_receive, SyncPredFlag ); - InitCPred( "mpi_bcast", 3, p_mpi_bcast3, SyncPredFlag ); - InitCPred( "mpi_bcast", 2, p_mpi_bcast2, SyncPredFlag ); - InitCPred( "mpi_barrier", 0, p_mpi_barrier, 0 ); + InitCPred( "mpi_open", 3, p_mpi_open, SafePredFlag|SyncPredFlag ); + InitCPred( "mpi_close", 0, p_mpi_close, SyncPredFlag ); + InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag|SyncPredFlag ); + InitCPred( "mpi_receive", 3, p_mpi_receive, SafePredFlag|SyncPredFlag ); + InitCPred( "mpi_bcast", 3, p_mpi_bcast3, SafePredFlag|SyncPredFlag ); + InitCPred( "mpi_bcast", 2, p_mpi_bcast2, SafePredFlag|SyncPredFlag ); + InitCPred( "mpi_barrier", 0, p_mpi_barrier, SyncPredFlag ); } #endif /* HAVE_MPI */