ismall fixes to MPI interface.

This commit is contained in:
Vítor Santos Costa
2012-02-03 16:31:49 +00:00
parent ab33cacf7b
commit a2c86643b0
8 changed files with 62 additions and 88 deletions

View File

@@ -53,7 +53,7 @@ __ptr_t get_next_object(hashtable table,ulong key)
}
/* removes the element with key 'key' and returns the object stored on him */
/* removes the element with key 'key' and returns the object stored on it */
__ptr_t delete(hashtable table,ulong key)
{
__ptr_t obj;

View File

@@ -99,9 +99,9 @@ expand_buffer(const size_t space ) {
void
change_buffer_size(const size_t newsize) {
if ( BUFFER_SIZE>=BLOCK_SIZE && BUFFER_SIZE>newsize)
if ( BUFFER_SIZE>=BLOCK_SIZE && BUFFER_SIZE>=newsize)
return;
if(BUFFER_PTR) {
if (BUFFER_PTR) {
free(BUFFER_PTR);
}
BUFFER_PTR = (char*)malloc(newsize);
@@ -185,17 +185,16 @@ term2string(char *const ptr, size_t *size, const YAP_Term t) {
do {
if (*size == 0) {
BUFFER_LEN = YAP_ExportTerm( t, BUFFER_PTR, BUFFER_SIZE );// canonical
*size = BUFFER_LEN = YAP_ExportTerm( t, BUFFER_PTR, BUFFER_SIZE );// canonical
ret=BUFFER_PTR;
if (BUFFER_LEN == 0) {
expand_buffer(BLOCK_SIZE);
}
} else {
BUFFER_LEN = YAP_ExportTerm( t, ptr, BUFFER_SIZE );// canonical
*size = YAP_ExportTerm( t, ptr, BUFFER_SIZE );// canonical
ret=ptr;
}
*size = BUFFER_LEN;
if (BUFFER_LEN == 0) {
expand_buffer(BLOCK_SIZE);
}
} while (BUFFER_LEN <= 0);
} while (*size <= 0);
return ret;
}
/*
@@ -206,31 +205,10 @@ string2term(char *const ptr,const size_t *size) {
YAP_Term t;
struct buffer_ds b;
b.size=b.len=b.pos=0;
if (BUFFER_PTR!=ptr) { //
#ifdef DEBUG
write_msg(__FUNCTION__,__FILE__,__LINE__,"copy buffer string2term\n");
#endif
COPY_BUFFER_DS(buffer,b); // keep a copy of buffer_ds
BUFFER_PTR=ptr; // make the buffer use the buffer provided
BUFFER_LEN=*size;
BUFFER_SIZE=BUFFER_LEN;
} else { // b aux. struct. not needed
b.ptr=NULL;
}
BUFFER_POS=0;
t = YAP_ImportTerm( BUFFER_PTR );
t = YAP_ImportTerm( ptr );
if ( t==FALSE ) {
write_msg(__FUNCTION__,__FILE__,__LINE__,"FAILED string2term>>>>size:%d %d %s\n",BUFFER_SIZE,strlen(BUFFER_PTR),NULL);
write_msg(__FUNCTION__,__FILE__,__LINE__,"FAILED string2term>>>>size:%lx %d\n",t,*size);
exit(1);
}
if (b.ptr!=NULL)
COPY_BUFFER_DS(b,buffer);
#ifdef DEBUG
write_msg(__FUNCTION__,__FILE__,__LINE__,"ending: buffer(ptr=%p;size=%d;pos=%d;len=%d)\n",BUFFER_PTR,BUFFER_SIZE,BUFFER_POS,BUFFER_LEN);
#endif
return t;
}

View File

@@ -70,7 +70,7 @@ void write_msg(const char *fun,const char *file, int line,const char *format, ..
#define BLOCK_SIZE 4*1024
// deletes the buffer (all fields) but does not release the memory of the buffer.ptr
#define DEL_BUFFER {buffer.ptr=NULL;buffer.size=0;buffer.len=0;}
#define DEL_BUFFER {buffer.ptr=NULL;buffer.size=0;buffer.len=0;buffer.pos=0;}
// informs the prologterm2c module that the buffer is now used and should not be messed
#define USED_BUFFER() DEL_BUFFER
// initialize buffer

View File

@@ -81,6 +81,8 @@ extern int GLOBAL_argc;
static hashtable requests=NULL;
static hashtable broadcasts=NULL;
void init_mpi(void);
/********************************************************************
* Time accounting
********************************************************************/
@@ -116,8 +118,6 @@ static struct timeval _tstart, _tend;
#include <sys/resource.h>
#include <unistd.h>
void init_mpi(void);
void tstart(void) {
struct rusage r;
getrusage(RUSAGE_SELF,&r);
@@ -407,12 +407,13 @@ mpi_isend(void) {
#ifdef DEBUG
write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag);
#endif
USED_BUFFER(); // informs the prologterm2c module that the buffer is now used and should not be messed
// We must associate the string to each handle
new_request(handle,str);
USED_BUFFER(); // informs the prologterm2c module that the buffer is now used and should not be messed
PAUSE_TIMER();
RETURN(YAP_Unify(YAP_ARG4,YAP_MkIntTerm(HANDLE2INT(handle))));// it should always succeed
}
/*
* Blocking communication function. The message is sent immediatly.
* mpi_send(+Data, +Destination, +Tag).
@@ -454,7 +455,7 @@ static int
mpi_recv(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1),
t2 = YAP_Deref(YAP_ARG2),
t3 = YAP_Deref(YAP_ARG3),
t3 = YAP_Deref(YAP_ARG3),
t4;
int tag, orig;
int len=0;
@@ -490,7 +491,7 @@ mpi_recv(void) {
RETURN(FALSE);
}
//realloc memory buffer
change_buffer_size(len+1);
change_buffer_size((size_t)(len+1));
BUFFER_LEN=len;
// Already know the source from MPI_Probe()
if( orig == MPI_ANY_SOURCE ) {
@@ -523,8 +524,9 @@ mpi_recv(void) {
MSG_RECV(BUFFER_LEN);
t4=string2term(BUFFER_PTR,&BUFFER_LEN);
PAUSE_TIMER();
RETURN(YAP_Unify(t3,t4));
RETURN(YAP_Unify(YAP_ARG3,t4));
}
/*
* Implements a non-blocking receive operation.
* mpi_irecv(?Source,?Tag,-Handle).
@@ -535,8 +537,6 @@ mpi_irecv(void) {
t2 = YAP_Deref(YAP_ARG2),
t3 = YAP_Deref(YAP_ARG3);
int tag, orig;
int len=0;
MPI_Status status;
MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request));
// The third argument (data) must be unbound
@@ -558,34 +558,8 @@ mpi_irecv(void) {
else tag = YAP_IntOfTerm( t2 );
CONT_TIMER();
// probe for the size of the term
if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS ){
PAUSE_TIMER();
RETURN(FALSE);
}
//
MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &len ));
change_buffer_size(len+1);
BUFFER_LEN=len;
// Already know the source from MPI_Probe()
if( orig == MPI_ANY_SOURCE ) {
orig = status.MPI_SOURCE;
if ( YAP_Unify(t1, YAP_MkIntTerm(orig))!=TRUE ) {
PAUSE_TIMER();
RETURN(FALSE);
}
}
// Already know the tag from MPI_Probe()
if( tag == MPI_ANY_TAG ) {
tag = status.MPI_TAG;
if (YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))!=TRUE) {
PAUSE_TIMER();
RETURN(FALSE);
}
}
if( MPI_CALL(MPI_Irecv( BUFFER_PTR, BUFFER_LEN, MPI_CHAR, orig, tag,
RESET_BUFFER;
if( MPI_CALL(MPI_Irecv( BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, orig, tag,
MPI_COMM_WORLD, handle )) != MPI_SUCCESS ) {
PAUSE_TIMER();
RETURN(FALSE);
@@ -595,6 +569,7 @@ mpi_irecv(void) {
PAUSE_TIMER();
RETURN(YAP_Unify(t3,YAP_MkIntTerm(HANDLE2INT(handle))));
}
/*
* Completes a non-blocking operation. IF the operation was a send, the
* function waits until the message is buffered or sent by the runtime
@@ -625,6 +600,7 @@ mpi_wait(void) {
PAUSE_TIMER();
RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
* Provides information regarding a handle, ie. if a communication operation has been completed.
* If the operation has been completed the predicate succeeds with the completion status,
@@ -656,6 +632,7 @@ mpi_test(void) {
PAUSE_TIMER();
RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
* Completes a non-blocking operation. IF the operation was a send, the
* function waits until the message is buffered or sent by the runtime
@@ -666,13 +643,12 @@ mpi_test(void) {
*/
static int
mpi_wait_recv(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
t2 = YAP_Deref(YAP_ARG2), // Status
t3 = YAP_Deref(YAP_ARG3); // data
YAP_Term t1 = YAP_Deref(YAP_ARG1); // data
MPI_Status status;
MPI_Request *handle;
char *s;
int len,ret;
YAP_Term out;
// The first argument (handle) must be an integer
if(!YAP_IsIntTerm(t1)) {
@@ -681,19 +657,22 @@ mpi_wait_recv(void) {
CONT_TIMER();
handle=INT2HANDLE(YAP_IntOfTerm(t1));
s=(char*)get_request(handle);
// wait for communication completion
if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS) {
PAUSE_TIMER();
RETURN(FALSE);
}
s=(char*)get_request(handle);
len=strlen(s);
ret=YAP_Unify(t3,string2term(s,(size_t*)&len));
len=YAP_SizeOfExportedTerm(s);
// make sure we only fetch ARG3 after constructing the term
out = string2term(s,(size_t*)&len);
MSG_RECV(len);
free_request(handle);
PAUSE_TIMER();
RETURN(ret & YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
ret=YAP_Unify(YAP_ARG3,out);
RETURN(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
* Provides information regarding a handle, ie. if a communication operation has been completed.
* If the operation has been completed the predicate succeeds with the completion status,
@@ -703,14 +682,13 @@ mpi_wait_recv(void) {
*/
static int
mpi_test_recv(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
t2 = YAP_Deref(YAP_ARG2), // Status
t3 = YAP_Deref(YAP_ARG3); // data
YAP_Term t1 = YAP_Deref(YAP_ARG1); // data
MPI_Status status;
MPI_Request *handle;
int flag,len,ret;
char *s;
YAP_Term out;
// The first argument (handle) must be an integer
if(!YAP_IsIntTerm(t1)) {
@@ -726,11 +704,14 @@ mpi_test_recv(void) {
}
s=(char*)get_request(handle);
len=strlen(s);
ret=YAP_Unify(t3,string2term(s,(size_t*)&len));
out = string2term(s,(size_t*)&len);
// make sure we only fetch ARG3 after constructing the term
ret=YAP_Unify(YAP_ARG3,out);
free_request(handle);
PAUSE_TIMER();
RETURN(ret & YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
RETURN(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
* Collective communication function that performs a barrier synchronization among all processes.
* mpi_barrier