diff --git a/C/c_interface.c b/C/c_interface.c index 268c700a7..9beec3e9e 100644 --- a/C/c_interface.c +++ b/C/c_interface.c @@ -556,6 +556,7 @@ X_API void *STD_PROTO(YAP_OpaqueObjectFromTerm,(Term)); X_API int STD_PROTO(YAP_Argv,(char *** argvp)); X_API YAP_tag_t STD_PROTO(YAP_TagOfTerm,(Term)); X_API size_t STD_PROTO(YAP_ExportTerm,(Term, char *, size_t)); +X_API size_t STD_PROTO(YAP_SizeOfExportedTerm,(char *)); X_API Term STD_PROTO(YAP_ImportTerm,(char *)); static UInt @@ -4008,10 +4009,14 @@ YAP_ExportTerm(Term inp, char * buf, size_t len) { size_t res; if (!len) return 0; - if ((res = Yap_ExportTerm(inp, buf, len, current_arity())) < 0) { - exit(1); - } - return res; + return Yap_ExportTerm(inp, buf, len, current_arity()); +} + +X_API size_t +YAP_SizeOfExportedTerm(char * buf) { + if (!buf) + return 0; + return Yap_SizeOfExportedTerm(buf); } X_API Term diff --git a/C/utilpreds.c b/C/utilpreds.c index db5838c68..788fdf393 100644 --- a/C/utilpreds.c +++ b/C/utilpreds.c @@ -1168,7 +1168,7 @@ CELL *CellDifH(CELL *hptr, CELL *hlow) return (CELL *)((char *)hptr-(char *)hlow); } -#define AdjustSizeAtom(X) ((char *)(((CELL)X+7) & (CELL)(-8))) +#define AdjustSizeAtom(X) ((char *)(((CELL)(X)+(8-1)) & ~(8-1))) static inline Atom export_atom(Atom at, char **hpp, char *buf, size_t len) @@ -1648,6 +1648,13 @@ Yap_ImportTerm(char * buf) { return tret; } +size_t +Yap_SizeOfExportedTerm(char * buf) { + CELL *bc = (CELL *)buf; + + return bc[0]+bc[1]*sizeof(CELL); +} + #define DEBUG_IMPORT 1 #if DEBUG_IMPORT diff --git a/H/Yapproto.h b/H/Yapproto.h index 32f38f634..e1a4a1219 100644 --- a/H/Yapproto.h +++ b/H/Yapproto.h @@ -390,6 +390,7 @@ void STD_PROTO(Yap_InitUserBacks,(void)); Term STD_PROTO(Yap_CopyTerm,(Term)); int STD_PROTO(Yap_Variant,(Term, Term)); size_t STD_PROTO(Yap_ExportTerm,(Term, char *, size_t, UInt)); +size_t STD_PROTO(Yap_SizeOfExportedTerm,(char *)); Term STD_PROTO(Yap_ImportTerm,(char *)); int STD_PROTO(Yap_IsListTerm,(Term)); Term STD_PROTO(Yap_CopyTermNoShare,(Term)); diff --git a/include/YapInterface.h b/include/YapInterface.h index 2db8a3e7a..371863aba 100644 --- a/include/YapInterface.h +++ b/include/YapInterface.h @@ -585,12 +585,14 @@ extern X_API YAP_Term PROTO(YAP_NewOpaqueObject,(YAP_opaque_tag_t, size_t)); extern X_API void *PROTO(YAP_OpaqueObjectFromTerm,(YAP_Term)); -extern X_API int *PROTO(YAP_Argv,(char ***)); +extern X_API int PROTO(YAP_Argv,(char ***)); extern X_API YAP_tag_t PROTO(YAP_TagOfTerm,(YAP_Term)); extern X_API size_t PROTO(YAP_ExportTerm,(YAP_Term, char *, size_t)); +extern X_API size_t PROTO(YAP_SizeOfExportedTerm,(char *)); + extern X_API YAP_Term PROTO(YAP_ImportTerm,(char *)); #define YAP_InitCPred(N,A,F) YAP_UserCPredicate(N,F,A) diff --git a/library/lammpi/hash.c b/library/lammpi/hash.c index c47ba7f07..c557a5d2a 100644 --- a/library/lammpi/hash.c +++ b/library/lammpi/hash.c @@ -53,7 +53,7 @@ __ptr_t get_next_object(hashtable table,ulong key) } -/* removes the element with key 'key' and returns the object stored on him */ +/* removes the element with key 'key' and returns the object stored on it */ __ptr_t delete(hashtable table,ulong key) { __ptr_t obj; diff --git a/library/lammpi/prologterms2c.c b/library/lammpi/prologterms2c.c index 6be7685a7..86966efb1 100644 --- a/library/lammpi/prologterms2c.c +++ b/library/lammpi/prologterms2c.c @@ -99,9 +99,9 @@ expand_buffer(const size_t space ) { void change_buffer_size(const size_t newsize) { - if ( BUFFER_SIZE>=BLOCK_SIZE && BUFFER_SIZE>newsize) + if ( BUFFER_SIZE>=BLOCK_SIZE && BUFFER_SIZE>=newsize) return; - if(BUFFER_PTR) { + if (BUFFER_PTR) { free(BUFFER_PTR); } BUFFER_PTR = (char*)malloc(newsize); @@ -185,17 +185,16 @@ term2string(char *const ptr, size_t *size, const YAP_Term t) { do { if (*size == 0) { - BUFFER_LEN = YAP_ExportTerm( t, BUFFER_PTR, BUFFER_SIZE );// canonical + *size = BUFFER_LEN = YAP_ExportTerm( t, BUFFER_PTR, BUFFER_SIZE );// canonical ret=BUFFER_PTR; + if (BUFFER_LEN == 0) { + expand_buffer(BLOCK_SIZE); + } } else { - BUFFER_LEN = YAP_ExportTerm( t, ptr, BUFFER_SIZE );// canonical + *size = YAP_ExportTerm( t, ptr, BUFFER_SIZE );// canonical ret=ptr; } - *size = BUFFER_LEN; - if (BUFFER_LEN == 0) { - expand_buffer(BLOCK_SIZE); - } - } while (BUFFER_LEN <= 0); + } while (*size <= 0); return ret; } /* @@ -206,31 +205,10 @@ string2term(char *const ptr,const size_t *size) { YAP_Term t; struct buffer_ds b; - b.size=b.len=b.pos=0; - if (BUFFER_PTR!=ptr) { // -#ifdef DEBUG - write_msg(__FUNCTION__,__FILE__,__LINE__,"copy buffer string2term\n"); -#endif - COPY_BUFFER_DS(buffer,b); // keep a copy of buffer_ds - BUFFER_PTR=ptr; // make the buffer use the buffer provided - BUFFER_LEN=*size; - BUFFER_SIZE=BUFFER_LEN; - } else { // b aux. struct. not needed - b.ptr=NULL; - } - BUFFER_POS=0; - t = YAP_ImportTerm( BUFFER_PTR ); + t = YAP_ImportTerm( ptr ); if ( t==FALSE ) { - write_msg(__FUNCTION__,__FILE__,__LINE__,"FAILED string2term>>>>size:%d %d %s\n",BUFFER_SIZE,strlen(BUFFER_PTR),NULL); + write_msg(__FUNCTION__,__FILE__,__LINE__,"FAILED string2term>>>>size:%lx %d\n",t,*size); exit(1); } - - if (b.ptr!=NULL) - COPY_BUFFER_DS(b,buffer); - -#ifdef DEBUG - write_msg(__FUNCTION__,__FILE__,__LINE__,"ending: buffer(ptr=%p;size=%d;pos=%d;len=%d)\n",BUFFER_PTR,BUFFER_SIZE,BUFFER_POS,BUFFER_LEN); -#endif - return t; } diff --git a/library/lammpi/prologterms2c.h b/library/lammpi/prologterms2c.h index c6cd937e0..56a632a18 100644 --- a/library/lammpi/prologterms2c.h +++ b/library/lammpi/prologterms2c.h @@ -70,7 +70,7 @@ void write_msg(const char *fun,const char *file, int line,const char *format, .. #define BLOCK_SIZE 4*1024 // deletes the buffer (all fields) but does not release the memory of the buffer.ptr -#define DEL_BUFFER {buffer.ptr=NULL;buffer.size=0;buffer.len=0;} +#define DEL_BUFFER {buffer.ptr=NULL;buffer.size=0;buffer.len=0;buffer.pos=0;} // informs the prologterm2c module that the buffer is now used and should not be messed #define USED_BUFFER() DEL_BUFFER // initialize buffer diff --git a/library/lammpi/yap_mpi.c b/library/lammpi/yap_mpi.c index 5b6eb90c5..7d6f79f67 100644 --- a/library/lammpi/yap_mpi.c +++ b/library/lammpi/yap_mpi.c @@ -81,6 +81,8 @@ extern int GLOBAL_argc; static hashtable requests=NULL; static hashtable broadcasts=NULL; +void init_mpi(void); + /******************************************************************** * Time accounting ********************************************************************/ @@ -116,8 +118,6 @@ static struct timeval _tstart, _tend; #include #include -void init_mpi(void); - void tstart(void) { struct rusage r; getrusage(RUSAGE_SELF,&r); @@ -407,12 +407,13 @@ mpi_isend(void) { #ifdef DEBUG write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag); #endif + USED_BUFFER(); // informs the prologterm2c module that the buffer is now used and should not be messed // We must associate the string to each handle new_request(handle,str); - USED_BUFFER(); // informs the prologterm2c module that the buffer is now used and should not be messed PAUSE_TIMER(); RETURN(YAP_Unify(YAP_ARG4,YAP_MkIntTerm(HANDLE2INT(handle))));// it should always succeed } + /* * Blocking communication function. The message is sent immediatly. * mpi_send(+Data, +Destination, +Tag). @@ -454,7 +455,7 @@ static int mpi_recv(void) { YAP_Term t1 = YAP_Deref(YAP_ARG1), t2 = YAP_Deref(YAP_ARG2), - t3 = YAP_Deref(YAP_ARG3), + t3 = YAP_Deref(YAP_ARG3), t4; int tag, orig; int len=0; @@ -490,7 +491,7 @@ mpi_recv(void) { RETURN(FALSE); } //realloc memory buffer - change_buffer_size(len+1); + change_buffer_size((size_t)(len+1)); BUFFER_LEN=len; // Already know the source from MPI_Probe() if( orig == MPI_ANY_SOURCE ) { @@ -523,8 +524,9 @@ mpi_recv(void) { MSG_RECV(BUFFER_LEN); t4=string2term(BUFFER_PTR,&BUFFER_LEN); PAUSE_TIMER(); - RETURN(YAP_Unify(t3,t4)); + RETURN(YAP_Unify(YAP_ARG3,t4)); } + /* * Implements a non-blocking receive operation. * mpi_irecv(?Source,?Tag,-Handle). @@ -535,8 +537,6 @@ mpi_irecv(void) { t2 = YAP_Deref(YAP_ARG2), t3 = YAP_Deref(YAP_ARG3); int tag, orig; - int len=0; - MPI_Status status; MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request)); // The third argument (data) must be unbound @@ -558,34 +558,8 @@ mpi_irecv(void) { else tag = YAP_IntOfTerm( t2 ); CONT_TIMER(); - - // probe for the size of the term - if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS ){ - PAUSE_TIMER(); - RETURN(FALSE); - } - // - MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &len )); - change_buffer_size(len+1); - BUFFER_LEN=len; - // Already know the source from MPI_Probe() - if( orig == MPI_ANY_SOURCE ) { - orig = status.MPI_SOURCE; - if ( YAP_Unify(t1, YAP_MkIntTerm(orig))!=TRUE ) { - PAUSE_TIMER(); - RETURN(FALSE); - } - } - // Already know the tag from MPI_Probe() - if( tag == MPI_ANY_TAG ) { - tag = status.MPI_TAG; - if (YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))!=TRUE) { - PAUSE_TIMER(); - RETURN(FALSE); - } - } - - if( MPI_CALL(MPI_Irecv( BUFFER_PTR, BUFFER_LEN, MPI_CHAR, orig, tag, + RESET_BUFFER; + if( MPI_CALL(MPI_Irecv( BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, orig, tag, MPI_COMM_WORLD, handle )) != MPI_SUCCESS ) { PAUSE_TIMER(); RETURN(FALSE); @@ -595,6 +569,7 @@ mpi_irecv(void) { PAUSE_TIMER(); RETURN(YAP_Unify(t3,YAP_MkIntTerm(HANDLE2INT(handle)))); } + /* * Completes a non-blocking operation. IF the operation was a send, the * function waits until the message is buffered or sent by the runtime @@ -625,6 +600,7 @@ mpi_wait(void) { PAUSE_TIMER(); RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR))); } + /* * Provides information regarding a handle, ie. if a communication operation has been completed. * If the operation has been completed the predicate succeeds with the completion status, @@ -656,6 +632,7 @@ mpi_test(void) { PAUSE_TIMER(); RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR))); } + /* * Completes a non-blocking operation. IF the operation was a send, the * function waits until the message is buffered or sent by the runtime @@ -666,13 +643,12 @@ mpi_test(void) { */ static int mpi_wait_recv(void) { - YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle - t2 = YAP_Deref(YAP_ARG2), // Status - t3 = YAP_Deref(YAP_ARG3); // data + YAP_Term t1 = YAP_Deref(YAP_ARG1); // data MPI_Status status; MPI_Request *handle; char *s; int len,ret; + YAP_Term out; // The first argument (handle) must be an integer if(!YAP_IsIntTerm(t1)) { @@ -681,19 +657,22 @@ mpi_wait_recv(void) { CONT_TIMER(); handle=INT2HANDLE(YAP_IntOfTerm(t1)); + s=(char*)get_request(handle); // wait for communication completion if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS) { PAUSE_TIMER(); RETURN(FALSE); } - s=(char*)get_request(handle); - len=strlen(s); - ret=YAP_Unify(t3,string2term(s,(size_t*)&len)); + len=YAP_SizeOfExportedTerm(s); + // make sure we only fetch ARG3 after constructing the term + out = string2term(s,(size_t*)&len); MSG_RECV(len); free_request(handle); PAUSE_TIMER(); - RETURN(ret & YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR))); + ret=YAP_Unify(YAP_ARG3,out); + RETURN(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR))); } + /* * Provides information regarding a handle, ie. if a communication operation has been completed. * If the operation has been completed the predicate succeeds with the completion status, @@ -703,14 +682,13 @@ mpi_wait_recv(void) { */ static int mpi_test_recv(void) { - YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle - t2 = YAP_Deref(YAP_ARG2), // Status - t3 = YAP_Deref(YAP_ARG3); // data + YAP_Term t1 = YAP_Deref(YAP_ARG1); // data MPI_Status status; MPI_Request *handle; int flag,len,ret; char *s; + YAP_Term out; // The first argument (handle) must be an integer if(!YAP_IsIntTerm(t1)) { @@ -726,11 +704,14 @@ mpi_test_recv(void) { } s=(char*)get_request(handle); len=strlen(s); - ret=YAP_Unify(t3,string2term(s,(size_t*)&len)); + out = string2term(s,(size_t*)&len); + // make sure we only fetch ARG3 after constructing the term + ret=YAP_Unify(YAP_ARG3,out); free_request(handle); PAUSE_TIMER(); - RETURN(ret & YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR))); + RETURN(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR))); } + /* * Collective communication function that performs a barrier synchronization among all processes. * mpi_barrier