/************************************************************************* * * * YAP Prolog * * * * Yap Prolog was developed at NCCUP - Universidade do Porto * * * * Copyright S. Konstantopoulos and Universidade do Porto 2002-2003 * * * ************************************************************************** * * * File: mpi.c * * Last rev: $Date: 2003-07-03 15:01:18 $ * * mods: * * comments: Interface to MPI libraries * * * *************************************************************************/ #ifndef lint static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.20 2003-07-03 15:01:18 stasinos Exp $"; #endif #include "Yap.h" #if HAVE_MPI #include "Yatom.h" #include "yapio.h" #include <stdlib.h> #include <string.h> #include <mpi.h> Term STD_PROTO(YAP_Read, (int (*)(void))); void STD_PROTO(YAP_Write, (Term, void (*)(int), int)); STATIC_PROTO (Int p_mpi_open, (void)); STATIC_PROTO (Int p_mpi_close, (void)); STATIC_PROTO (Int p_mpi_send, (void)); STATIC_PROTO (Int p_mpi_receive, (void)); STATIC_PROTO (Int p_mpi_bcast3, (void)); STATIC_PROTO (Int p_mpi_bcast2, (void)); STATIC_PROTO (Int p_mpi_barrier, (void)); /* * Auxiliary Data */ static Int rank, numprocs, namelen; static char processor_name[MPI_MAX_PROCESSOR_NAME]; static Int mpi_argc; static char **mpi_argv; /* this should eventually be moved to config.h */ #define RECV_BUF_SIZE 1024*32 /* * A simple stream */ static size_t bufsize, bufstrlen; static char *buf; static int bufptr; static void expand_buffer( int space ) { #if MPI_AVOID_REALLOC /* realloc() has been SIGSEGV'ing on HP-UX 10.20, but there is no problem in HP-UX 11.0. We can remove this bit here as soon as Yap stops compiling on 10.20 anyway. If removed, also remove the MPI_AVOID_REALLOC bits from configure.in and config.h.in */ char *tmp; tmp = malloc( bufsize + space ); if( tmp == NULL ) { Yap_Error(SYSTEM_ERROR, TermNil, "out of memory" ); Yap_exit( EXIT_FAILURE ); } memcpy( tmp, buf, bufsize ); free( buf ); buf = tmp; #else /* use realloc */ buf = realloc( buf, bufsize + space ); if( buf == NULL ) { Yap_Error(SYSTEM_ERROR, TermNil, "out of memory"); Yap_exit( EXIT_FAILURE ); } #endif bufsize += space; } static void mpi_putc(Int ch) { if( ch > 0 ) { if( bufptr >= bufsize ) expand_buffer( RECV_BUF_SIZE ); buf[bufptr++] = ch; } } static Int mpi_getc(void) { if( bufptr < bufsize ) return buf[bufptr++]; else return -1; } /* * C Predicates */ static Int p_mpi_open(void) /* mpi_open(?rank, ?num_procs, ?proc_name) */ { Term t_rank = Deref(ARG1), t_numprocs = Deref(ARG2), t_procname = Deref(ARG3); Int retv; /* With MPICH MPI_Init() must be called during initialisation, but with LAM it can be called from Prolog (mpi_open/3) The symptoms match a known RedHat bug, see http://email.osc.edu/pipermail/mpiexec/2002-July/000067.html for a suggested workaround: Redhat have somehow broken their sem.h and ipc.h. If you use your own kernel, copy from ../src/kernel/include/asm & ../src/kernel/include/linux the file ipc.h and sem.h to /usr/include/sys, recompile your mpich and everything might start working. (it did for us) */ /* Note that if MPI_Init() fails, Yap/MPICH and Yap/LAM behave differently: in Yap/MPICH we are still at the Yap initialisation phase, so we get Yap exit(FAILURE), whereas in Yap/LAM mpi_open/3 simply fails. */ retv = MPI_Init( &mpi_argc, &mpi_argv ); if( retv ) { Term t; t = MkIntegerTerm(retv); Yap_Error( SYSTEM_ERROR, t, "MPI_Init() returned non-zero" ); return FALSE; } MPI_Comm_size( MPI_COMM_WORLD, &numprocs ); MPI_Comm_rank( MPI_COMM_WORLD, &rank ); MPI_Get_processor_name( processor_name, &namelen ); retv = Yap_unify(t_rank, MkIntTerm(rank)); retv = retv && Yap_unify(t_numprocs, MkIntTerm(numprocs)); retv = retv && Yap_unify(t_procname, MkAtomTerm(Yap_LookupAtom(processor_name))); return retv; } static Int /* mpi_close */ p_mpi_close() { MPI_Finalize(); return TRUE; } static Int p_mpi_send() /* mpi_send(+data, +destination, +tag) */ { Term t_data = Deref(ARG1), t_dest = Deref(ARG2), t_tag = Deref(ARG3); int tag, dest, retv; /* The first argument (data) must be bound */ if (IsVarTerm(t_data)) { Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_send"); return (FALSE); } /* The second and third args must be bount to integers */ if (IsVarTerm(t_dest)) { Yap_Error(INSTANTIATION_ERROR, t_dest, "mpi_send"); return (FALSE); } else if( !IsIntegerTerm(t_dest) ) { Yap_Error(TYPE_ERROR_INTEGER, t_dest, "mpi_send"); return (FALSE); } else { dest = IntOfTerm( t_dest ); } if (IsVarTerm(t_tag)) { Yap_Error(INSTANTIATION_ERROR, t_tag, "mpi_send"); return (FALSE); } else if( !IsIntegerTerm(t_tag) ) { Yap_Error(TYPE_ERROR_INTEGER, t_tag, "mpi_send"); return (FALSE); } else { tag = IntOfTerm( t_tag ); } /* Turn the term into its ASCII representation */ bufptr = 0; YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f ); /* The buf is not NULL-terminated and does not have the trailing ". " required by the parser */ mpi_putc( '.' ); mpi_putc( ' ' ); mpi_putc( 0 ); bufstrlen = strlen(buf); /* send the data */ bufptr = 0; retv = MPI_Send( &buf[bufptr], bufstrlen, MPI_CHAR, dest, tag, MPI_COMM_WORLD ); if( retv != MPI_SUCCESS ) return FALSE; return TRUE; } static Int p_mpi_receive() /* mpi_receive(-data, ?orig, ?tag) */ { Term t, t_data = Deref(ARG1), t_orig = Deref(ARG2), t_tag = Deref(ARG3); int tag, orig, retv; MPI_Status status; /* The first argument (data) must be unbound */ if(!IsVarTerm(t_data)) { Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_receive"); return FALSE; } /* The second argument (source) must be bound to an integer (the rank of the source) or left unbound (i.e. any source is OK) */ if (IsVarTerm(t_orig)) { orig = MPI_ANY_SOURCE; } else if( !IsIntegerTerm(t_orig) ) { Yap_Error(TYPE_ERROR_INTEGER, t_orig, "mpi_receive"); return (FALSE); } else { orig = IntOfTerm( t_orig ); } /* The third argument must be bound to an integer (the tag) or left unbound (i.e. any tag is OK) */ if (IsVarTerm(t_tag)) { tag = MPI_ANY_TAG; } else if( !IsIntegerTerm(t_tag) ) { Yap_Error(TYPE_ERROR_INTEGER, t_tag, "mpi_receive"); return (FALSE); } else tag = IntOfTerm( t_tag ); /* probe for the size of the term */ retv = MPI_Probe( orig, tag, MPI_COMM_WORLD, &status ); if( retv != MPI_SUCCESS ) { return FALSE; } MPI_Get_count( &status, MPI_CHAR, &bufstrlen ); /* adjust the buffer */ if( bufsize < bufstrlen ) expand_buffer(bufstrlen-bufsize); /* Already know the source from MPI_Probe() */ if( orig == MPI_ANY_SOURCE ) { orig = status.MPI_SOURCE; retv = Yap_unify(t_orig, MkIntTerm(orig)); if( retv == FALSE ) { printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__); } } /* Already know the tag from MPI_Probe() */ if( tag == MPI_ANY_TAG ) { tag = status.MPI_TAG; retv = Yap_unify(t_tag, MkIntTerm(status.MPI_TAG)); if( retv == FALSE ) { printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__); } } /* Receive the message as a C string */ retv = MPI_Recv( buf, bufstrlen, MPI_CHAR, orig, tag, MPI_COMM_WORLD, &status ); if( retv != MPI_SUCCESS ) { /* Getting in here would be weird; it means the first package (size) was sent properly, but there was a glitch with the actual content! */ return FALSE; } /* parse received string into a Prolog term */ bufptr = 0; t = YAP_Read( mpi_getc ); if( t == TermNil ) { retv = FALSE; } else { retv = Yap_unify(t, t_data); } return retv; } static Int p_mpi_bcast3() /* mpi_bcast( ?data, +root, +max_size ) */ { Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3); int root, retv, max_size; /* The second argument must be bound to an integer (the rank of root processor */ if (IsVarTerm(t_root)) { Yap_Error(INSTANTIATION_ERROR, t_root, "mpi_bcast"); return FALSE; } root = IntOfTerm( t_root ); /* If this is the root processor, then the first argument must be bound to the term to be sent. */ if( root == rank ) { if( IsVarTerm(t_data) ) { Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast"); return FALSE; } /* Turn the term into its ASCII representation */ bufptr = 0; YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f ); /* NULL-terminate the string and add the ". " termination required by the parser. */ mpi_putc( '.' ); mpi_putc( ' ' ); mpi_putc( 0 ); bufstrlen = strlen(buf); } /* The third argument must be bound to an integer (the maximum length of the broadcast term's ASCII representation */ if (IsVarTerm(t_max_size)) { Yap_Error(INSTANTIATION_ERROR, t_max_size, "mpi_bcast"); return FALSE; } /* allow for the ". " bit and the NULL at the end */ max_size = IntOfTerm( t_max_size ) + 3; if( max_size < bufstrlen ) { /* issue a warning? explode? bcast s'thing unparsable? */ printf( "MAYDAY: max_size == %d, bufstrlen == %d\n ", max_size, bufstrlen); return FALSE; } /* adjust the buffer size, if necessary */ if( max_size > bufsize ) { expand_buffer( max_size-bufsize ); } retv = MPI_Bcast( buf, max_size, MPI_CHAR, root, MPI_COMM_WORLD ); if( retv != MPI_SUCCESS ) { printf( "OOOPS! MPI_Bcast() returned %d.\n", retv ); return FALSE; } if( root == rank ) return TRUE; else { /* ARG1 must be unbound so that it can receive data */ if( !IsVarTerm(t_data) ) { Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast"); return FALSE; } bufstrlen = strlen(buf); bufptr = 0; /* parse received string into a Prolog term */ return Yap_unify( YAP_Read(mpi_getc), ARG1 ); } } /* This is the same as above, but for dynamic data size. It is implemented as two broadcasts, the first being the size and the second the actual data. */ static Int p_mpi_bcast2() /* mpi_bcast( ?data, +root ) */ { Term t_data = Deref(ARG1), t_root = Deref(ARG2); int root, retv; /* The second argument must be bound to an integer (the rank of root processor */ if (IsVarTerm(t_root)) { Yap_Error(INSTANTIATION_ERROR, t_root, "mpi_bcast"); return FALSE; } root = IntOfTerm( t_root ); /* If this is the root processor, then the first argument must be bound to the term to be sent. */ if( root == rank ) { if( IsVarTerm(t_data) ) { Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast"); return FALSE; } bufptr = 0; /* Turn the term into its ASCII representation */ YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f ); /* NULL-terminate the string and add the ". " termination required by the parser. */ buf[bufptr] = 0; strcat( buf, ". " ); bufstrlen = bufptr + 2; } /* Otherwise, it must a variable */ else { if( !IsVarTerm(t_data) ) { Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast"); return FALSE; } } /* Broadcast the data size */ retv = MPI_Bcast( &bufstrlen, sizeof bufstrlen, MPI_INT, root, MPI_COMM_WORLD ); if( retv != MPI_SUCCESS ) { printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__); return FALSE; } /* adjust the buffer size, if necessary */ if( bufstrlen > bufsize ) { expand_buffer( bufstrlen - bufsize ); } /* Broadcast the data */ retv = MPI_Bcast( buf, bufstrlen, MPI_CHAR, root, MPI_COMM_WORLD ); if( retv != MPI_SUCCESS ) { printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__); return FALSE; } if( root == rank ) return TRUE; else { /* ARG1 must be unbound so that it can receive data */ if( !IsVarTerm(t_data) ) { Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast"); return FALSE; } bufstrlen = strlen(buf); bufptr = 0; return Yap_unify(YAP_Read( mpi_getc ), ARG1); } } static Int p_mpi_barrier() /* mpi_barrier/0 */ { int retv; retv = MPI_Barrier( MPI_COMM_WORLD ); return (retv == 0); } /* * Init */ void Yap_InitMPI(void) { int i,j; mpi_argv = malloc( Yap_argc * sizeof(char *) ); mpi_argv[0] = strdup( Yap_argv[0] ); bufsize = RECV_BUF_SIZE; buf = malloc(bufsize * sizeof(char)); for( i=1; i<Yap_argc; ++i ) { if( !strcmp(Yap_argv[i], "--") ) { ++i; break; } } for( j=1; i<Yap_argc; ++i, ++j ) { mpi_argv[j] = strdup( Yap_argv[i] ); } mpi_argc = j; mpi_argv[0] = strdup( Yap_argv[0] ); Yap_InitCPred( "mpi_open", 3, p_mpi_open, SafePredFlag|SyncPredFlag ); Yap_InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag|SyncPredFlag ); Yap_InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag|SyncPredFlag ); Yap_InitCPred( "mpi_receive", 3, p_mpi_receive, SafePredFlag|SyncPredFlag ); Yap_InitCPred( "mpi_bcast", 3, p_mpi_bcast3, SafePredFlag|SyncPredFlag ); Yap_InitCPred( "mpi_bcast", 2, p_mpi_bcast2, SafePredFlag|SyncPredFlag ); Yap_InitCPred( "mpi_barrier", 0, p_mpi_barrier, SafePredFlag|SyncPredFlag ); } #endif /* HAVE_MPI */