MPI fixes

This commit is contained in:
Vítor Santos Costa 2015-02-11 01:50:59 +00:00
parent 9743c81f05
commit 2d919a4a09
7 changed files with 188 additions and 124 deletions

View File

@ -78,6 +78,8 @@ if (MPI_C_FOUND)
include_directories (${MPI_INCLUDE_DIRS})
add_definitions (-DHAVE_MPI_H=1)
install(TARGETS yap_mpi
LIBRARY DESTINATION ${dlls}
)

View File

@ -3,9 +3,6 @@
# (EROOT for architecture-dependent files)
#
#
# default base directory for YAP installation
# (EROOT for architecture-dependent files)
#
prefix = @prefix@
exec_prefix = @exec_prefix@
ROOTDIR = $(prefix)
@ -62,7 +59,7 @@ hash.o: $(srcdir)/hash.c $(srcdir)/hash.h
@DO_SECOND_LD@ @SHLIB_LD@ $(LDFLAGS) -o yap_mpi.@SO@ $(OBJS) $(MPILDF) @EXTRA_LIBS_FOR_DLLS@
install: all install-examples
@if test "$(SOBJS)" = ""; then echo ""; else $(INSTALL_PROGRAM) $(SOBJS) $(DESTDIR)$(YAPLIBDIR); fi
if test x"$(SOBJS)" != "x"; then $(INSTALL_PROGRAM) $(SOBJS) $(DESTDIR)$(YAPLIBDIR); fi
clean:
rm -f *.o *~ $(OBJS) $(SOBJS) *.BAK

View File

@ -35,7 +35,7 @@ fi
dnl LAM/MPI interface
if test "$yap_cv_mpi" != "no" -a "$INSTALL_DLLS" != "no"; then
if test "$yap_cv_mpi" != "yes"; then
if test "$yap_cv_mpi" = "yes"; then
AC_PATH_PROG(MPI_CC,mpicc,"$CC",$PATH:/sbin:/usr/sbin:/usr/etc:/usr/local/sbin:/usr/lib64/openmpi/bin:/usr/lib/openmpi/bin)
else
AC_PATH_PROG(MPI_CC,mpicc,"$CC",$yap_cv_mpi/bin:$PATH:/sbin:/usr/sbin:/usr/etc:/usr/local/sbin:/usr/lib64/openmpi/bin:/usr/lib/openmpi/bin)

47
library/lammpi/examples/gowait Executable file
View File

@ -0,0 +1,47 @@
#!/usr/local/bin/yap -L --
% called with
% mpirun -np 2 gowait.
% prints
% ------
% main
% main
% after_init
% after_init
% [0,2]
% [1,2]
% irecv
% wait_end
% after_send
% c(535755152,)
:- use_module(library(lam_mpi)).
:- use_module(library(system)).
main:-
write(main),nl,
mpi_init,
write(after_init),nl,
mpi_comm_size(S),
mpi_comm_rank(R),
write([R,S]),nl,
(R == 0->
sleep(2),
write(wait_end),nl,
mpi_send(ciao,1,201),
write(after_send),nl
;
mpi_irecv(0,_201,H),
write(irecv),nl,
test(H)
),
mpi_finalize.
test(H):-
(mpi_wait_recv(H,S,Data)->
write(c(S,Data)),nl
;
write(no),nl,
test(H)
).
:-main.

View File

@ -98,11 +98,23 @@ expand_buffer(const size_t space ) {
/*
* Changes the size of the buffer to contain at least newsize bytes
*/
void
change_buffer_size(const size_t newsize) {
if ( BUFFER_SIZE>=BLOCK_SIZE && BUFFER_SIZE>=newsize)
return;
if (realloc( BUFFER_PTR, newsize) == NULL) {
void change_buffer_size(const size_t newsize) {
if ( BUFFER_PTR == NULL )
{
if ((BUFFER_PTR = malloc( BLOCK_SIZE < newsize ? newsize : BLOCK_SIZE)) == NULL) {
YAP_Error(0,0,"Prolog2Term: Out of memory.\n");
#ifdef MPI
MPI_Finalize();
#endif
YAP_Exit( 1 );
}
}
else if ((BUFFER_SIZE>=BLOCK_SIZE &&
BUFFER_SIZE>=newsize) )
{
return;
}
else if ((BUFFER_PTR = realloc( BUFFER_PTR, newsize)) == NULL) {
YAP_Error(0,0,"Prolog2Term: Out of memory.\n");
#ifdef MPI
MPI_Finalize();
@ -137,7 +149,7 @@ p2c_putt(const YAP_Term t) {
size_t
write_term_to_stream(const int fd,const YAP_Term term) {
RESET_BUFFER;
RESET_BUFFER();
printf("BUFFER_PTR=%p\n", BUFFER_PTR);
p2c_putt(term);
if (write(fd,(void*)BUFFER_PTR,BUFFER_LEN) < 0) { // write term
@ -154,7 +166,7 @@ YAP_Term
read_term_from_stream(const int fd) {
size_t size;
RESET_BUFFER;
RESET_BUFFER();
if (!read(fd,(void*)&size,sizeof(size_t))) { // read the size of the term
YAP_Error(0,0,"Prolog2Term: IO error in read.\n");
}
@ -179,7 +191,7 @@ read_term_from_stream(const int fd) {
char*
term2string(char *const ptr, size_t *size, const YAP_Term t) {
char *ret;
RESET_BUFFER;
RESET_BUFFER();
do {
if (*size == 0) {

View File

@ -68,16 +68,17 @@ void write_msg(const char *fun,const char *file, int line,const char *format, ..
* Macros to manipulate the buffer
*********************************************************************************************/
extern int BLOCK_SIZE;
extern size_t BLOCK_SIZE;
#define buffer (buffers[YAP_ThreadSelf()])
// deletes the buffer (all fields) but does not release the memory of the buffer.ptr
#define DEL_BUFFER {buffer.ptr=NULL;buffer.size=0;buffer.len=0;buffer.pos=0;}
#define DEL_BUFFER() {buffer.ptr=NULL;buffer.size=0;buffer.len=0;buffer.pos=0;}
// informs the prologterm2c module that the buffer is now used and should not be messed
#define USED_BUFFER() DEL_BUFFER
#define USED_BUFFER() DEL_BUFFER()
// initialize buffer
#define RESET_BUFFER {buffer.len=0;change_buffer_size(BLOCK_SIZE);buffer.pos=0;}
#define RESET_BUFFER() \
{buffer.len=0;change_buffer_size(BLOCK_SIZE);buffer.pos=0;}
#define BUFFER_PTR buffer.ptr
#define BUFFER_SIZE buffer.size
#define BUFFER_LEN buffer.len

View File

@ -1,14 +1,14 @@
/*
Copyright (C) 2004,2005,2006 (Nuno A. Fonseca) <nuno.fonseca@gmail.com>
This program is free software; you can redistribute it and/or
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
but WITHOUT ANY WxuARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
@ -69,7 +69,7 @@ typedef struct broadcast_req BroadcastRequest;
/********************************************************************
* Auxiliary data
********************************************************************/
static int mpi_statuss[1024];
static YAP_Bool mpi_statuss[1024];
#define mpi_status (mpi_statuss[YAP_ThreadSelf()])
extern int GLOBAL_argc;
@ -108,7 +108,7 @@ static double total_time_spent; // total time spend in communication code
#define CONT_TIMER() {tstart();}
#define PAUSE_TIMER() {tend();total_time_spent+=tval();}
#define RETURN(p) {PAUSE_TIMER();return (p);}
#define return(p) {PAUSE_TIMER();return (p);}
static struct timeval _tstarts[1024], _tends[1024];
@ -141,7 +141,7 @@ double tval(){
/*
* returns the statistics
*/
static int mpi_stats(void){
static YAP_Bool mpi_stats(void){
fprintf(stderr,"%f %ld %ld %ld %ld %ld %ld\n",MPITIME,num_msgs_recv,bytes_recv,max_s_recv_msg,num_msgs_sent,bytes_sent,max_s_sent_msg);
return (YAP_Unify(YAP_ARG1, YAP_MkFloatTerm((float)(MPITIME))) &&
YAP_Unify(YAP_ARG2, YAP_MkIntTerm((long)num_msgs_recv)) &&
@ -155,7 +155,7 @@ static int mpi_stats(void){
/*
*
*/
static int mpi_reset_stats(void) {RESET_STATS(); return (TRUE);}
static YAP_Bool mpi_reset_stats(void) {RESET_STATS(); return true;}
#else
#define PAUSE_TIMER()
@ -165,7 +165,7 @@ static int mpi_reset_stats(void) {RESET_STATS(); return (TRUE);}
#define RESET_STATS()
#define MSG_SENT(size)
#define MSG_RECV(size)
#define RETURN(p) {return (p);}
#define return(p) {return (p);}
#endif
/********************************************************************
@ -240,7 +240,7 @@ new_broadcast_request(BroadcastRequest* b,MPI_Request *handle,void* ptr) {
}
/*********************************************************************/
static int mpi_error(int errcode){
static YAP_Bool mpi_error(int errcode){
char err_msg[MPI_MAX_ERROR_STRING];
int len;
@ -258,7 +258,7 @@ static int mpi_error(int errcode){
* Sets up the mpi enviromment. This function should be called before any other MPI
* function.
*/
static int
static YAP_Bool
mpi_init(void){
int thread_level;
char ** my_argv;
@ -271,7 +271,7 @@ mpi_init(void){
#ifdef MPISTATS
RESET_STATS();
#endif
return (TRUE);
return true;
}
#ifdef USE_THREADS
@ -280,7 +280,7 @@ mpi_init(void){
* function.
* the argument is the name of the predicate that will be invoked when a message is received
*/
static int
static YAP_Bool
rcv_msg_thread(char *handle_pred) {
YAP_Term pred=YAP_MkAtomTerm(YAP_LookupAtom(handle_pred));
MPI_Status status;
@ -299,7 +299,7 @@ rcv_msg_thread(char *handle_pred) {
/*
*
*/
static int
static YAP_Bool
mpi_init_rcv_thread(void){
int thread_level;
// MPI_Init(&GLOBAL_argc, &GLOBAL_argv);
@ -308,12 +308,12 @@ mpi_init_rcv_thread(void){
MPI_Init_thread(&GLOBAL_argc, &GLOBAL_argv,MPI_THREAD_SINGLE,&thread_level);
if(pthread_create(&thread,NULL,(void*)&rcv_msg_thread,arg)) {
return (FALSE);
return false;
}
pthread_detach(thread);
write_msg(__FUNCTION__,__FILE__,__LINE__,"Thread level: %d\n",thread_level);
return (TRUE);
return true;
}
#endif
@ -322,15 +322,15 @@ mpi_init_rcv_thread(void){
* exiting.
* mpi_comm_finalize.
*/
static int
static YAP_Bool
mpi_finalize(void){
return (MPI_Finalize()==MPI_SUCCESS?TRUE:FALSE);
return (MPI_Finalize()==MPI_SUCCESS?true:false);
}
/*
* Returns the number of workers associated to the MPI_COMM_WORLD communicator.
* mpi_comm_size(-Size).
*/
static int
static YAP_Bool
mpi_comm_size(void){
int size;
MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
@ -340,7 +340,7 @@ mpi_comm_size(void){
* Returns the rank of the current process.
* mpi_comm_rank(-Rank).
*/
static int
static YAP_Bool
mpi_comm_rank(void){
int rank;
MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
@ -350,7 +350,7 @@ mpi_comm_rank(void){
* Returns the major and minor version of MPI.
* mpi_version(-Major,-Minor).
*/
static int
static YAP_Bool
mpi_version(void){
int major,minor;
@ -361,7 +361,7 @@ mpi_version(void){
*
*
*/
static int
static YAP_Bool
mpi_get_processor_name(void) {
char name[MPI_MAX_PROCESSOR_NAME];
int length;
@ -375,7 +375,7 @@ mpi_get_processor_name(void) {
*
* mpi_isend(+Data, +Destination, +Tag, -Handle).
*/
static int
static YAP_Bool
mpi_isend(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1),
t2 = YAP_Deref(YAP_ARG2),
@ -387,11 +387,11 @@ mpi_isend(void) {
MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request));
CONT_TIMER();
if ( handle==NULL ) return (FALSE);
if ( handle==NULL ) return false;
if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3) || !YAP_IsVarTerm(t4)) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
//
dest = YAP_IntOfTerm(t2);
@ -402,7 +402,7 @@ mpi_isend(void) {
// send the data
if( MPI_CALL(MPI_Isend( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD ,handle)) != MPI_SUCCESS ) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
#ifdef DEBUG
@ -412,14 +412,14 @@ mpi_isend(void) {
// We must associate the string to each handle
new_request(handle,str);
PAUSE_TIMER();
RETURN(YAP_Unify(YAP_ARG4,YAP_MkIntTerm(HANDLE2INT(handle))));// it should always succeed
return(YAP_Unify(YAP_ARG4,YAP_MkIntTerm(HANDLE2INT(handle))));// it should always succeed
}
/*
* Blocking communication function. The message is sent immediatly.
* mpi_send(+Data, +Destination, +Tag).
*/
static int
static YAP_Bool
mpi_send(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1),
@ -430,7 +430,7 @@ mpi_send(void) {
size_t len=0;
int val;
if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3)) {
return (FALSE);
return false;
}
CONT_TIMER();
@ -439,20 +439,20 @@ mpi_send(void) {
tag = YAP_IntOfTerm(t3);
// the data is packaged as a string
str=term2string(NULL,&len,t1);
#ifdef DEBUG
#if defined(DEBUG) && 0
write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag);
#endif
// send the data
val=(MPI_CALL(MPI_Send( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD))==MPI_SUCCESS?TRUE:FALSE);
val=(MPI_CALL(MPI_Send( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD))==MPI_SUCCESS?true:false);
PAUSE_TIMER();
RETURN(val);
return(val);
}
/*
* Implements a blocking receive operation.
* mpi_recv(?Source,?Tag,-Data).
*/
static int
static YAP_Bool
mpi_recv(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1),
t2 = YAP_Deref(YAP_ARG2),
@ -464,32 +464,32 @@ mpi_recv(void) {
//The third argument (data) must be unbound
if(!YAP_IsVarTerm(t3)) {
return FALSE;
return false;
}
/* The first argument (Source) must be bound to an integer
(the rank of the source) or left unbound (i.e. any source
is OK) */
if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE;
else if( !YAP_IsIntTerm(t1) ) return (FALSE);
else if( !YAP_IsIntTerm(t1) ) return false;
else orig = YAP_IntOfTerm(t1);
/* The second argument must be bound to an integer (the tag)
or left unbound (i.e. any tag is OK) */
if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG;
else if( !YAP_IsIntTerm(t2) ) return (FALSE);
else if( !YAP_IsIntTerm(t2) ) return false;
else tag = YAP_IntOfTerm( t2 );
CONT_TIMER();
// probe for term' size
if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
if( MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &len )) != MPI_SUCCESS ||
status.MPI_TAG==MPI_UNDEFINED ||
status.MPI_SOURCE==MPI_UNDEFINED) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
//realloc memory buffer
change_buffer_size((size_t)(len+1));
@ -499,7 +499,7 @@ mpi_recv(void) {
orig = status.MPI_SOURCE;
if( !YAP_Unify(t1, YAP_MkIntTerm(orig))) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
}
// Already know the tag from MPI_Probe()
@ -507,7 +507,7 @@ mpi_recv(void) {
tag = status.MPI_TAG;
if( !YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
}
// Receive the message as a string
@ -517,7 +517,7 @@ mpi_recv(void) {
package (containing size) was sent properly, but there was a glitch with
the actual content! */
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
#ifdef DEBUG
write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,BUFFER_PTR, BUFFER_LEN, orig, tag);
@ -525,50 +525,50 @@ mpi_recv(void) {
MSG_RECV(BUFFER_LEN);
t4=string2term(BUFFER_PTR,&BUFFER_LEN);
PAUSE_TIMER();
RETURN(YAP_Unify(YAP_ARG3,t4));
return(YAP_Unify(YAP_ARG3,t4));
}
/*
* Implements a non-blocking receive operation.
* mpi_irecv(?Source,?Tag,-Handle).
*/
static int
static YAP_Bool
mpi_irecv(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1),
t2 = YAP_Deref(YAP_ARG2),
t3 = YAP_Deref(YAP_ARG3);
int tag, orig;
MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request));
MPI_Request *mpi_req=(MPI_Request*)malloc(sizeof(MPI_Request));
// The third argument (data) must be unbound
// The third argument (data) must be unbound
if(!YAP_IsVarTerm(t3)) {
//Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_receive");
return FALSE;
return false;
}
/* The first argument (Source) must be bound to an integer
(the rank of the source) or left unbound (i.e. any source
is OK) */
if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE;
else if( !YAP_IsIntTerm(t1) ) return (FALSE);
else if( !YAP_IsIntTerm(t1) ) return false;
else orig = YAP_IntOfTerm(t1);
/* The third argument must be bound to an integer (the tag)
or left unbound (i.e. any tag is OK) */
if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG;
else if( !YAP_IsIntTerm(t2) ) return (FALSE);
else if( !YAP_IsIntTerm(t2) ) return false;
else tag = YAP_IntOfTerm( t2 );
CONT_TIMER();
RESET_BUFFER;
RESET_BUFFER();
if( MPI_CALL(MPI_Irecv( BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, orig, tag,
MPI_COMM_WORLD, handle )) != MPI_SUCCESS ) {
MPI_COMM_WORLD, mpi_req )) != MPI_SUCCESS ) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
new_request(handle,BUFFER_PTR);
DEL_BUFFER; // force the realocation of a new memory block
new_request(mpi_req,BUFFER_PTR);
DEL_BUFFER();
PAUSE_TIMER();
RETURN(YAP_Unify(t3,YAP_MkIntTerm(HANDLE2INT(handle))));
return YAP_Unify(t3,YAP_MkIntTerm(HANDLE2INT(mpi_req)));
}
/*
@ -579,7 +579,7 @@ mpi_irecv(void) {
* buffer.
* mpi_wait(+Handle,-Status).
*/
static int
static YAP_Bool
mpi_wait(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
t2 = YAP_Deref(YAP_ARG2); // Status
@ -588,27 +588,29 @@ mpi_wait(void) {
// The first argument must be an integer (an handle)
if(!YAP_IsIntTerm(t1)) {
return FALSE;
return false;
}
handle=INT2HANDLE(YAP_IntOfTerm(t1));
CONT_TIMER();
// probe for term' size
if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS ) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
free_request(handle);
PAUSE_TIMER();
RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
return(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
* mpi_test(+Handle,-Status)
*
* Provides information regarding a handle, ie. if a communication operation has been completed.
* If the operation has been completed the predicate succeeds with the completion status,
* otherwise it fails.
* mpi_test(+Handle,-Status).
* ).
*/
static int
static YAP_Bool
mpi_test(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
t2 = YAP_Deref(YAP_ARG2); // Status
@ -618,31 +620,32 @@ mpi_test(void) {
// The first argument (handle) must be an integer
if(!YAP_IsIntTerm(t1)) {
return FALSE;
return false;
}
CONT_TIMER();
handle=INT2HANDLE(YAP_IntOfTerm(t1));
//
MPI_CALL(MPI_Test( handle , &flag, &status ));
if( flag != TRUE ) {
if( flag != true ) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
free_request(handle);
PAUSE_TIMER();
RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
return(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
/** mpi_wait(+Handle,-Status,-Data
*
* Completes a non-blocking operation. IF the operation was a send, the
* function waits until the message is buffered or sent by the runtime
* system. At this point the send buffer is released. If the operation
* was a receive, it waits until the message is copied to the receive
* buffer.
* mpi_wait(+Handle,-Status,-Data).
* .
*/
static int
static YAP_Bool
mpi_wait_recv(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1); // data
MPI_Status status;
@ -653,7 +656,7 @@ mpi_wait_recv(void) {
// The first argument (handle) must be an integer
if(!YAP_IsIntTerm(t1)) {
return FALSE;
return false;
}
CONT_TIMER();
@ -662,7 +665,7 @@ mpi_wait_recv(void) {
// wait for communication completion
if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
len=YAP_SizeOfExportedTerm(s);
// make sure we only fetch ARG3 after constructing the term
@ -671,7 +674,7 @@ mpi_wait_recv(void) {
free_request(handle);
PAUSE_TIMER();
ret=YAP_Unify(YAP_ARG3,out);
RETURN(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
return(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
@ -681,7 +684,7 @@ mpi_wait_recv(void) {
*
* mpi_test(+Handle,-Status,-Data).
*/
static int
static YAP_Bool
mpi_test_recv(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1); // data
@ -693,7 +696,7 @@ mpi_test_recv(void) {
// The first argument (handle) must be an integer
if(!YAP_IsIntTerm(t1)) {
return FALSE;
return false;
}
CONT_TIMER();
@ -701,7 +704,7 @@ mpi_test_recv(void) {
//
if( MPI_CALL(MPI_Test( handle , &flag, &status ))!=MPI_SUCCESS) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
s=(char*)get_request(handle);
len=strlen(s);
@ -710,19 +713,19 @@ mpi_test_recv(void) {
ret=YAP_Unify(YAP_ARG3,out);
free_request(handle);
PAUSE_TIMER();
RETURN(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
return(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
* Collective communication function that performs a barrier synchronization among all processes.
* mpi_barrier
*/
static int
static YAP_Bool
mpi_barrier(void) {
CONT_TIMER();
int ret=MPI_CALL(MPI_Barrier(MPI_COMM_WORLD));
PAUSE_TIMER();
return (ret==MPI_SUCCESS?TRUE:FALSE);
return (ret==MPI_SUCCESS?true:false);
}
/***********************************
* Broadcast
@ -735,7 +738,7 @@ mpi_barrier(void) {
*
* mpi_bcast(+Root,+Data).
*/
static int
static YAP_Bool
mpi_bcast(void) {
YAP_Term t1 = YAP_Deref(YAP_ARG1),
t2 = YAP_Deref(YAP_ARG2);
@ -745,7 +748,7 @@ mpi_bcast(void) {
int rank;
//The arguments should be bound
if(!YAP_IsIntTerm(t1)) {
return FALSE;
return false;
}
MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
@ -757,12 +760,12 @@ mpi_bcast(void) {
write_msg(__FUNCTION__,__FILE__,__LINE__,"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root);
#endif
} else {
RESET_BUFFER;
RESET_BUFFER();
str = BUFFER_PTR;
len = BLOCK_SIZE;
}
// send the data
val=(MPI_CALL(MPI_Bcast( str, len, MPI_CHAR, root, MPI_COMM_WORLD))==MPI_SUCCESS?TRUE:FALSE);
val=(MPI_CALL(MPI_Bcast( str, len, MPI_CHAR, root, MPI_COMM_WORLD))==MPI_SUCCESS?true:false);
#ifdef MPISTATS
@ -780,9 +783,9 @@ mpi_bcast(void) {
out = string2term(str,(size_t*)&len);
MSG_RECV(len);
if (!YAP_Unify(YAP_ARG2, out))
return FALSE;
return false;
}
RETURN(val);
return(val);
}
/*
@ -792,7 +795,7 @@ mpi_bcast(void) {
* To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2
* mpi_bcast_int(+Root,+Data,+Tag).
*/
static int
static YAP_Bool
my_bcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
int root;
int k,worldsize;
@ -802,7 +805,7 @@ my_bcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
//The arguments should be bound
if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
return FALSE;
return false;
}
CONT_TIMER();
@ -819,19 +822,19 @@ my_bcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
MSG_SENT(len);
if(MPI_CALL(MPI_Send( str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD))!=MPI_SUCCESS) {
PAUSE_TIMER();
return(FALSE);
return false;
}
#ifdef DEBUG
write_msg(__FUNCTION__,__FILE__,__LINE__,"bcast2(%s,%u, MPI_CHAR,%d,%d)\n",str,len,k,tag);
#endif
}
PAUSE_TIMER();
RETURN(TRUE);
return true;
}
/*
* mpi_bcast(+Root,+Data).
*/
static int
static YAP_Bool
mpi_bcast2(void) {
return my_bcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0));
}
@ -843,7 +846,7 @@ mpi_bcast2(void) {
*
* mpi_bcast(+Root,+Data,+Tag).
*/
static int
static YAP_Bool
mpi_bcast3(void) {
return my_bcast(YAP_ARG1,YAP_ARG2,YAP_ARG3);
}
@ -852,7 +855,7 @@ mpi_bcast3(void) {
* all other processes of the group.
* mpi_ibcast(+Root,+Data,+Tag).
*/
static int
static YAP_Bool
my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
int root;
int k,worldsize;
@ -864,7 +867,7 @@ my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
//fprintf(stderr,"ibcast1");
//The arguments should be bound
if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
return FALSE;
return false;
}
CONT_TIMER();
@ -878,7 +881,7 @@ my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
b=new_broadcast();
if ( b==NULL ) {
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
//fprintf(stderr,"ibcast3");
for(k=0;k<=worldsize-1;++k) {
@ -889,7 +892,7 @@ my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
if(MPI_CALL(MPI_Isend(str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD,handle))!=MPI_SUCCESS) {
free(handle);
PAUSE_TIMER();
RETURN(FALSE);
return false;
}
new_broadcast_request(b,handle,str);
//new_request(handle,str);
@ -899,7 +902,7 @@ my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
if(!b->nreq)//release b if no messages were sent (worldsize==1)
free(b);
#ifdef DEBUG
#if defined(DEBUG) && defined(MALLINFO)
{
struct mallinfo s = mallinfo();
printf("%d: %d=%d/%d\n",getpid(),s.arena,s.uordblks,s.fordblks); //vsc
@ -907,7 +910,7 @@ my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
#endif
PAUSE_TIMER();
//fprintf(stderr,"ibcast4");
RETURN(TRUE);
return true;
}
/*
* Broadcasts a message from the process with rank "root" to
@ -916,14 +919,14 @@ my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
* The message is sent using MPI_Isend
* mpi_ibcast(+Root,+Data,+Tag).
*/
static int
static YAP_Bool
mpi_ibcast3(void) {
return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_ARG3);
}
/*
* mpi_ibcast(+Root,+Data).
*/
static int
static YAP_Bool
mpi_ibcast2(void) {
return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0));
}
@ -947,7 +950,7 @@ gc(hashtable ht) {
handle=INT2HANDLE(node->value);
MPI_CALL(MPI_Test( handle , &flag, &status ));
if ( flag==TRUE) {
if ( flag==true) {
MPI_CALL(MPI_Wait(handle,&status));
#ifdef DEBUG
write_msg(__FUNCTION__,__FILE__,__LINE__,"Released handle...%s\n",(char*)node->obj);
@ -961,7 +964,7 @@ gc(hashtable ht) {
/*
*
*/
static int
static YAP_Bool
mpi_gc(void) {
//write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc>: requests=%d\n",requests->n_entries);
CONT_TIMER();
@ -971,28 +974,30 @@ mpi_gc(void) {
gc(broadcasts);
//write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc<: requests=%d\n",requests->n_entries);
PAUSE_TIMER();
RETURN(TRUE);
return true;
}
size_t BLOCK_SIZE=4*1024;
static int
static YAP_Bool
mpi_default_buffer_size(void)
{
YAP_Term t2;
intptr_t IBLOCK_SIZE;
if (!YAP_Unify(YAP_ARG1,YAP_MkIntTerm(BLOCK_SIZE)))
return FALSE;
return false;
t2 = YAP_ARG2;
if (YAP_IsVarTerm(t2))
return TRUE;
return true;
if (!YAP_IsIntTerm(t2))
return FALSE;
BLOCK_SIZE= YAP_IntOfTerm(t2);
if (BLOCK_SIZE < 0) {
BLOCK_SIZE=4*1024;
return FALSE;
return false;
IBLOCK_SIZE= YAP_IntOfTerm(t2);
if (IBLOCK_SIZE < 0) {
IBLOCK_SIZE=4*1024;
return false;
}
return TRUE;
BLOCK_SIZE = IBLOCK_SIZE;
return true;
}
/********************************************************************
@ -1003,7 +1008,7 @@ init_mpi(void) {
requests=new_hashtable(HASHSIZE);
broadcasts=new_hashtable(HASHSIZE);
DEL_BUFFER;
DEL_BUFFER();
YAP_UserCPredicate( "mpi_init", mpi_init,0); // mpi_init/0
#ifdef USE_THREADS
YAP_UserCPredicate( "mpi_init_rcv_thread", mpi_init_rcv_thread,1); // mpi_init_rcv_thread(+HandleMsgGoal/1)