more MPI fixes.

This commit is contained in:
Vítor Santos Costa 2012-02-05 11:20:30 +00:00
parent 6c98e37e18
commit bd677152e7
5 changed files with 95 additions and 23 deletions

View File

@ -1655,25 +1655,34 @@ Yap_SizeOfExportedTerm(char * buf) {
return bc[0]+bc[1]*sizeof(CELL);
}
#define DEBUG_IMPORT 1
#if DEBUG_IMPORT
static char export_debug_buf[2048];
static Int
p_export_term( USES_REGS1 )
{
Yap_ExportTerm(ARG1, export_debug_buf, 2048, 1);
return TRUE;
size_t sz = 4096, osz;
char *export_buf;
do {
export_buf = malloc(sz);
if (!export_buf)
return FALSE;
if (!(osz = Yap_ExportTerm(ARG1, export_buf, sz, 1))) {
sz += 4096;
free(export_buf);
}
} while (osz);
return Yap_unify(ARG2,MkIntegerTerm(osz)) &&
Yap_unify(ARG3, MkIntegerTerm((Int)export_buf));
}
static Int
p_import_term( USES_REGS1 )
{
return Yap_unify(ARG1,Yap_ImportTerm(export_debug_buf));
char *export_buf = (char *)IntegerOfTerm(Deref(ARG1));
if (!export_buf)
return FALSE;
Int out = Yap_unify(ARG2,Yap_ImportTerm(export_buf));
free(export_buf);
return out;
}
#endif
static Term vars_in_complex_term(register CELL *pt0, register CELL *pt0_end, Term inp USES_REGS)
@ -4797,10 +4806,6 @@ void Yap_InitUtilCPreds(void)
Yap_InitCPred("rational_term_to_tree", 2, p_break_rational, 0);
Yap_InitCPred("tree_to_rational_term", 2, p_restore_rational, 0);
Yap_InitCPred("=@=", 2, p_variant, 0);
#ifdef DEBUG_IMPORT
Yap_InitCPred("import_term", 1, p_import_term, 0);
Yap_InitCPred("export_term", 1, p_export_term, 0);
#endif
Yap_InitCPred("numbervars", 3, p_numbervars, 0);
Yap_InitCPred("unnumbervars", 2, p_unnumbervars, 0);
/* use this carefully */
@ -4813,6 +4818,8 @@ void Yap_InitUtilCPreds(void)
Yap_InitCPred("subsumes", 2, p_subsumes, 0);
Yap_InitCPred("variables_within_term", 3, p_variables_within_term, 0);
Yap_InitCPred("new_variables_in_term", 3, p_new_variables_in_term, 0);
Yap_InitCPred("import_term", 2, p_import_term, 0);
Yap_InitCPred("export_term", 3, p_export_term, 0);
CurrentModule = cm;
#ifdef DEBUG
Yap_InitCPred("$force_trail_expansion", 1, p_force_trail_expansion, SafePredFlag|HiddenPredFlag);

View File

@ -12928,7 +12928,7 @@ able to use a regular @code{mpi_recv} to receive the messages, one
should use @code{mpi_bcast2}.
@item mpi_bcast2(+@var{Root}, +@var{Data})
@item mpi_bcast2(+@var{Root}, ?@var{Data})
@findex mpi_bcast/2
@snindex mpi_bcast/2
@cnindex mpi_bcast/2
@ -12952,6 +12952,25 @@ to all other processes.
Non-blocking operation. Broadcasts the message @var{Data} with tag @var{Tag}
from the process with rank @var{Root} to all other processes.
@item mpi_default_buffer_size(-@var{OldBufferSize}, ?@var{NewBufferSize})
@findex mpi_default_buffer_size/1
@snindex mpi_default_buffer_size/1
@cnindex mpi_default_buffer_size/1
The @var{OldBufferSize} argument unifies with the current size of the
MPI communication buffer size and sets the communication buffer size
@var{NewBufferSize}. The buffer is used for assynchronous waiting and
for broadcast receivers. Notice that buffer is local at each MPI
process.
@item mpi_msg_size(@var{Msg}, -@var{MsgSize})
@findex mpi_msg_size/2
@snindex mpi_msg_size/2
@cnindex mpi_msg_size/2
Unify @var{MsgSize} with the number of bytes YAP would need to send the
message @var{Msg}.
@item mpi_gc
@findex mpi_gc/0
@snindex mpi_gc/0

View File

@ -22,7 +22,14 @@
mpi_bcast2/2,
mpi_bcast2/3,
mpi_barrier/0,
mpi_msg_buffer_size/2,
mpi_msg_size/2,
mpi_gc/0
]).
:- load_foreign_files([yap_mpi], [], init_mpi).
mpi_msg_size(Term, Size) :-
terms:export_term(Term, Buf, Size),
terms:import_term(Buf, _).

View File

@ -67,7 +67,8 @@ void write_msg(const char *fun,const char *file, int line,const char *format, ..
/*********************************************************************************************
* Macros to manipulate the buffer
*********************************************************************************************/
#define BLOCK_SIZE 4*1024
extern int BLOCK_SIZE;
// deletes the buffer (all fields) but does not release the memory of the buffer.ptr
#define DEL_BUFFER {buffer.ptr=NULL;buffer.size=0;buffer.len=0;buffer.pos=0;}

View File

@ -66,9 +66,6 @@ typedef struct broadcast_req BroadcastRequest;
#ifdef USE_THREADS
#include <pthread.h>
//int pthread_create(pthread_t * thread, pthread_attr_t * attr, void *
// (*start_routine)(void *), void * arg);
//pthread_exit()
#endif
/********************************************************************
@ -741,20 +738,29 @@ mpi_bcast(void) {
int root,val;
size_t len=0;
char *str;
int rank;
//The arguments should be bound
if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1)) {
if(!YAP_IsIntTerm(t1)) {
return FALSE;
}
MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
CONT_TIMER();
root = YAP_IntOfTerm(t1);
str=term2string(NULL,&len,t2);
if (root == rank) {
str=term2string(NULL,&len,t2);
#ifdef DEBUG
write_msg(__FUNCTION__,__FILE__,__LINE__,"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root);
write_msg(__FUNCTION__,__FILE__,__LINE__,"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root);
#endif
} else {
RESET_BUFFER;
str = BUFFER_PTR;
len = BLOCK_SIZE;
}
// send the data
val=(MPI_CALL(MPI_Bcast( str, len, MPI_CHAR, root, MPI_COMM_WORLD))==MPI_SUCCESS?TRUE:FALSE);
#ifdef MPISTATS
{
int size;
@ -763,8 +769,18 @@ mpi_bcast(void) {
}
#endif
PAUSE_TIMER();
if (root != rank) {
YAP_Term out;
len=YAP_SizeOfExportedTerm(str);
// make sure we only fetch ARG3 after constructing the term
out = string2term(str,(size_t*)&len);
MSG_RECV(len);
if (!YAP_Unify(YAP_ARG2, out))
return FALSE;
}
RETURN(val);
}
/*
* Broadcasts a message from the process with rank "root" to
* all other processes of the group.
@ -953,6 +969,28 @@ mpi_gc(void) {
PAUSE_TIMER();
RETURN(TRUE);
}
int BLOCK_SIZE=4*1024;
static int
mpi_default_buffer_size(void)
{
YAP_Term t2;
if (!YAP_Unify(YAP_ARG1,YAP_MkIntTerm(BLOCK_SIZE)))
return FALSE;
t2 = YAP_ARG2;
if (YAP_IsVarTerm(t2))
return TRUE;
if (!YAP_IsIntTerm(t2))
return FALSE;
BLOCK_SIZE= YAP_IntOfTerm(t2);
if (BLOCK_SIZE < 0) {
BLOCK_SIZE=4*1024;
return FALSE;
}
return TRUE;
}
/********************************************************************
* Init
*******************************************************************/
@ -982,11 +1020,11 @@ init_mpi(void) {
YAP_UserCPredicate( "mpi_bcast", mpi_bcast,2); // mpi_bcast(Root,Term)
YAP_UserCPredicate( "mpi_bcast2", mpi_bcast2,2); // mpi_bcast2(Root,Term)
YAP_UserCPredicate( "mpi_bcast2", mpi_bcast3,3); // mpi_bcast2(Root,Term,Tag)
YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast2,2); // mpi_ibcast(Root,Term)
YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast3,3); // mpi_ibcast(Root,Term,Tag)
YAP_UserCPredicate( "mpi_barrier", mpi_barrier,0); // mpi_barrier/0
YAP_UserCPredicate( "mpi_gc", mpi_gc,0); // mpi_gc/0
YAP_UserCPredicate( "mpi_default_buffer_size", mpi_default_buffer_size,2); // buffer size
#ifdef MPISTATS
YAP_UserCPredicate( "mpi_stats", mpi_stats,7); // mpi_stats(-Time,#MsgsRecv,BytesRecv,MaxRecev,#MsgSent,BytesSent,MaxSent)
YAP_UserCPredicate( "mpi_reset_stats", mpi_reset_stats,0); // cleans the timers