From bd677152e7e8b57d150cba5524f8b8eba1860bee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADtor=20Santos=20Costa?= Date: Sun, 5 Feb 2012 11:20:30 +0000 Subject: [PATCH] more MPI fixes. --- C/utilpreds.c | 35 ++++++++++++++--------- docs/yap.tex | 21 +++++++++++++- library/lam_mpi.yap | 7 +++++ library/lammpi/prologterms2c.h | 3 +- library/lammpi/yap_mpi.c | 52 +++++++++++++++++++++++++++++----- 5 files changed, 95 insertions(+), 23 deletions(-) diff --git a/C/utilpreds.c b/C/utilpreds.c index 788fdf393..f8842c716 100644 --- a/C/utilpreds.c +++ b/C/utilpreds.c @@ -1655,25 +1655,34 @@ Yap_SizeOfExportedTerm(char * buf) { return bc[0]+bc[1]*sizeof(CELL); } -#define DEBUG_IMPORT 1 - -#if DEBUG_IMPORT - -static char export_debug_buf[2048]; - static Int p_export_term( USES_REGS1 ) { - Yap_ExportTerm(ARG1, export_debug_buf, 2048, 1); - return TRUE; + size_t sz = 4096, osz; + char *export_buf; + do { + export_buf = malloc(sz); + if (!export_buf) + return FALSE; + if (!(osz = Yap_ExportTerm(ARG1, export_buf, sz, 1))) { + sz += 4096; + free(export_buf); + } + } while (osz); + return Yap_unify(ARG2,MkIntegerTerm(osz)) && + Yap_unify(ARG3, MkIntegerTerm((Int)export_buf)); } static Int p_import_term( USES_REGS1 ) { - return Yap_unify(ARG1,Yap_ImportTerm(export_debug_buf)); + char *export_buf = (char *)IntegerOfTerm(Deref(ARG1)); + if (!export_buf) + return FALSE; + Int out = Yap_unify(ARG2,Yap_ImportTerm(export_buf)); + free(export_buf); + return out; } -#endif static Term vars_in_complex_term(register CELL *pt0, register CELL *pt0_end, Term inp USES_REGS) @@ -4797,10 +4806,6 @@ void Yap_InitUtilCPreds(void) Yap_InitCPred("rational_term_to_tree", 2, p_break_rational, 0); Yap_InitCPred("tree_to_rational_term", 2, p_restore_rational, 0); Yap_InitCPred("=@=", 2, p_variant, 0); -#ifdef DEBUG_IMPORT - Yap_InitCPred("import_term", 1, p_import_term, 0); - Yap_InitCPred("export_term", 1, p_export_term, 0); -#endif Yap_InitCPred("numbervars", 3, p_numbervars, 0); Yap_InitCPred("unnumbervars", 2, p_unnumbervars, 0); /* use this carefully */ @@ -4813,6 +4818,8 @@ void Yap_InitUtilCPreds(void) Yap_InitCPred("subsumes", 2, p_subsumes, 0); Yap_InitCPred("variables_within_term", 3, p_variables_within_term, 0); Yap_InitCPred("new_variables_in_term", 3, p_new_variables_in_term, 0); + Yap_InitCPred("import_term", 2, p_import_term, 0); + Yap_InitCPred("export_term", 3, p_export_term, 0); CurrentModule = cm; #ifdef DEBUG Yap_InitCPred("$force_trail_expansion", 1, p_force_trail_expansion, SafePredFlag|HiddenPredFlag); diff --git a/docs/yap.tex b/docs/yap.tex index 256a3cce1..6b59b8a05 100644 --- a/docs/yap.tex +++ b/docs/yap.tex @@ -12928,7 +12928,7 @@ able to use a regular @code{mpi_recv} to receive the messages, one should use @code{mpi_bcast2}. -@item mpi_bcast2(+@var{Root}, +@var{Data}) +@item mpi_bcast2(+@var{Root}, ?@var{Data}) @findex mpi_bcast/2 @snindex mpi_bcast/2 @cnindex mpi_bcast/2 @@ -12952,6 +12952,25 @@ to all other processes. Non-blocking operation. Broadcasts the message @var{Data} with tag @var{Tag} from the process with rank @var{Root} to all other processes. +@item mpi_default_buffer_size(-@var{OldBufferSize}, ?@var{NewBufferSize}) +@findex mpi_default_buffer_size/1 +@snindex mpi_default_buffer_size/1 +@cnindex mpi_default_buffer_size/1 + +The @var{OldBufferSize} argument unifies with the current size of the +MPI communication buffer size and sets the communication buffer size +@var{NewBufferSize}. The buffer is used for assynchronous waiting and +for broadcast receivers. Notice that buffer is local at each MPI +process. + + +@item mpi_msg_size(@var{Msg}, -@var{MsgSize}) +@findex mpi_msg_size/2 +@snindex mpi_msg_size/2 +@cnindex mpi_msg_size/2 +Unify @var{MsgSize} with the number of bytes YAP would need to send the +message @var{Msg}. + @item mpi_gc @findex mpi_gc/0 @snindex mpi_gc/0 diff --git a/library/lam_mpi.yap b/library/lam_mpi.yap index f5a715ac3..8144d2d76 100644 --- a/library/lam_mpi.yap +++ b/library/lam_mpi.yap @@ -22,7 +22,14 @@ mpi_bcast2/2, mpi_bcast2/3, mpi_barrier/0, + mpi_msg_buffer_size/2, + mpi_msg_size/2, mpi_gc/0 ]). :- load_foreign_files([yap_mpi], [], init_mpi). + +mpi_msg_size(Term, Size) :- + terms:export_term(Term, Buf, Size), + terms:import_term(Buf, _). + diff --git a/library/lammpi/prologterms2c.h b/library/lammpi/prologterms2c.h index 56a632a18..81e322139 100644 --- a/library/lammpi/prologterms2c.h +++ b/library/lammpi/prologterms2c.h @@ -67,7 +67,8 @@ void write_msg(const char *fun,const char *file, int line,const char *format, .. /********************************************************************************************* * Macros to manipulate the buffer *********************************************************************************************/ -#define BLOCK_SIZE 4*1024 + +extern int BLOCK_SIZE; // deletes the buffer (all fields) but does not release the memory of the buffer.ptr #define DEL_BUFFER {buffer.ptr=NULL;buffer.size=0;buffer.len=0;buffer.pos=0;} diff --git a/library/lammpi/yap_mpi.c b/library/lammpi/yap_mpi.c index 7d6f79f67..cbaa8b692 100644 --- a/library/lammpi/yap_mpi.c +++ b/library/lammpi/yap_mpi.c @@ -66,9 +66,6 @@ typedef struct broadcast_req BroadcastRequest; #ifdef USE_THREADS #include -//int pthread_create(pthread_t * thread, pthread_attr_t * attr, void * -// (*start_routine)(void *), void * arg); -//pthread_exit() #endif /******************************************************************** @@ -741,20 +738,29 @@ mpi_bcast(void) { int root,val; size_t len=0; char *str; + int rank; //The arguments should be bound - if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1)) { + if(!YAP_IsIntTerm(t1)) { return FALSE; } + MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank)); CONT_TIMER(); root = YAP_IntOfTerm(t1); - str=term2string(NULL,&len,t2); + if (root == rank) { + str=term2string(NULL,&len,t2); #ifdef DEBUG - write_msg(__FUNCTION__,__FILE__,__LINE__,"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root); + write_msg(__FUNCTION__,__FILE__,__LINE__,"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root); #endif + } else { + RESET_BUFFER; + str = BUFFER_PTR; + len = BLOCK_SIZE; + } // send the data val=(MPI_CALL(MPI_Bcast( str, len, MPI_CHAR, root, MPI_COMM_WORLD))==MPI_SUCCESS?TRUE:FALSE); + #ifdef MPISTATS { int size; @@ -763,8 +769,18 @@ mpi_bcast(void) { } #endif PAUSE_TIMER(); + if (root != rank) { + YAP_Term out; + len=YAP_SizeOfExportedTerm(str); + // make sure we only fetch ARG3 after constructing the term + out = string2term(str,(size_t*)&len); + MSG_RECV(len); + if (!YAP_Unify(YAP_ARG2, out)) + return FALSE; + } RETURN(val); } + /* * Broadcasts a message from the process with rank "root" to * all other processes of the group. @@ -953,6 +969,28 @@ mpi_gc(void) { PAUSE_TIMER(); RETURN(TRUE); } + +int BLOCK_SIZE=4*1024; + +static int +mpi_default_buffer_size(void) +{ + YAP_Term t2; + if (!YAP_Unify(YAP_ARG1,YAP_MkIntTerm(BLOCK_SIZE))) + return FALSE; + t2 = YAP_ARG2; + if (YAP_IsVarTerm(t2)) + return TRUE; + if (!YAP_IsIntTerm(t2)) + return FALSE; + BLOCK_SIZE= YAP_IntOfTerm(t2); + if (BLOCK_SIZE < 0) { + BLOCK_SIZE=4*1024; + return FALSE; + } + return TRUE; +} + /******************************************************************** * Init *******************************************************************/ @@ -982,11 +1020,11 @@ init_mpi(void) { YAP_UserCPredicate( "mpi_bcast", mpi_bcast,2); // mpi_bcast(Root,Term) YAP_UserCPredicate( "mpi_bcast2", mpi_bcast2,2); // mpi_bcast2(Root,Term) YAP_UserCPredicate( "mpi_bcast2", mpi_bcast3,3); // mpi_bcast2(Root,Term,Tag) - YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast2,2); // mpi_ibcast(Root,Term) YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast3,3); // mpi_ibcast(Root,Term,Tag) YAP_UserCPredicate( "mpi_barrier", mpi_barrier,0); // mpi_barrier/0 YAP_UserCPredicate( "mpi_gc", mpi_gc,0); // mpi_gc/0 + YAP_UserCPredicate( "mpi_default_buffer_size", mpi_default_buffer_size,2); // buffer size #ifdef MPISTATS YAP_UserCPredicate( "mpi_stats", mpi_stats,7); // mpi_stats(-Time,#MsgsRecv,BytesRecv,MaxRecev,#MsgSent,BytesSent,MaxSent) YAP_UserCPredicate( "mpi_reset_stats", mpi_reset_stats,0); // cleans the timers