diff --git a/library/lammpi/Makefile.in b/library/lammpi/Makefile.in new file mode 100644 index 000000000..a3c2740a8 --- /dev/null +++ b/library/lammpi/Makefile.in @@ -0,0 +1,71 @@ +# +# default base directory for YAP installation +# (EROOT for architecture-dependent files) +# +# +# default base directory for YAP installation +# (EROOT for architecture-dependent files) +# +prefix = @prefix@ +ROOTDIR = $(prefix) +EROOTDIR = @exec_prefix@ +# +# where the binary should be +# +BINDIR = $(EROOTDIR)/bin +# +# where YAP should look for libraries +# +LIBDIR=$(EROOTDIR)/lib/Yap +# +# +CC=@CC@ +MPI_CC=mpicc +CFLAGS= @CFLAGS@ $(YAP_EXTRAS) $(DEFS) -I$(srcdir) -I../.. -I$(srcdir)/../../include +# +# +# You shouldn't need to change what follows. +# +INSTALL=@INSTALL@ +INSTALL_DATA=@INSTALL_DATA@ +INSTALL_PROGRAM=@INSTALL_PROGRAM@ +SHELL=/bin/sh +RANLIB=@RANLIB@ +srcdir=@srcdir@ +SHLIB_CFLAGS=@SHLIB_CFLAGS@ +SHLIB_SUFFIX=@SHLIB_SUFFIX@ +CWD=$(PWD) +MPIF=`$(MPI_CC) -showme|sed "s/[^ ]*//"|sed "s/-pt/-lpt/"` +#MPIF=-L/usr/local/lib -llammpio -llamf77mpi -lmpi -llam -lutil -lpthread +# + +OBJS=yap_mpi.o hash.o prologterms2c.o +SOBJS=yap_mpi@SHLIB_SUFFIX@ + +#in some systems we just create a single object, in others we need to +# create a libray + +all: $(SOBJS) + +yap_mpi.o: $(srcdir)/yap_mpi.c $(srcdir)/yap_mpi.c + $(MPI_CC) $(CFLAGS) $(SHLIB_CFLAGS) $(MPIF) -c $(srcdir)/yap_mpi.c -o yap_mpi.o + +prologterms2c.o: $(srcdir)/prologterms2c.c $(srcdir)/prologterms2c.h + $(CC) -c $(CFLAGS) $(SHLIB_CFLAGS) $(srcdir)/prologterms2c.c -o prologterms2c.o + +hash.o: $(srcdir)/hash.c $(srcdir)/hash.h + $(CC) -c $(CFLAGS) $(SHLIB_CFLAGS) $(srcdir)/hash.c -o hash.o + +@DO_SECOND_LD@%@SHLIB_SUFFIX@: %.o +@DO_SECOND_LD@ @SHLIB_LD@ -o $@ $< + +@DO_SECOND_LD@yap_mpi@SHLIB_SUFFIX@: $(OBJS) +@DO_SECOND_LD@ @SHLIB_LD@ $(MPIF) -o yap_mpi@SHLIB_SUFFIX@ $(OBJS) + +install: all + $(INSTALL_PROGRAM) $(SOBJS) $(DESTDIR)$(LIBDIR) + +clean: + rm -f *.o *~ $(OBJS) $(SOBJS) *.BAK + + diff --git a/library/lammpi/hash.c b/library/lammpi/hash.c new file mode 100644 index 000000000..656781b2e --- /dev/null +++ b/library/lammpi/hash.c @@ -0,0 +1,205 @@ +/*===========================================================================================% + * Copyright (c) 2002,2003,2004,2005,2006 Nuno Fonseca. All rights reserved. + * This code is freely available for academic purposes. + * If you intend to use it for commercial purposes then please contact the author first. + * + * Author: Nuno Fonseca + * Date: 2002-07-10 + * $Id: hash.c,v 1.1 2006-06-02 04:16:31 nunofonseca Exp $ + * + *===========================================================================================*/ +#include +#include + +#include "hash.h" + +#define BUCKET(table,i) table->buckets[i] +#define HASHSIZE(table) table->size + +static int mhash(hashtable,ulong); +static hashnode* hash_lookup(hashtable,ulong); + + +static hashnode* hash_lookup(hashtable table,ulong key){ + + table->last_node = BUCKET(table,mhash(table,key)); /* set a pointer to the first bucket */ + while ( table->last_node != NULL ) { + if( table->last_node->value==key) return table->last_node; + table->last_node = table->last_node->next; + } + return NULL; +} +__ptr_t get_next_object(hashtable table,ulong key) +{ + if(table->last_node==NULL) + return NULL; + table->last_node = table->last_node->next; + while ( table->last_node != NULL ) { + if( table->last_node->value==key) return table->last_node->obj; + table->last_node = table->last_node->next; + } + return NULL; +} + + +/* removes the element with key 'key' and returns the object stored on him */ +__ptr_t delete(hashtable table,ulong key) +{ + __ptr_t obj; + hashnode *b,*prev=NULL; + int c=mhash(table,key); + b=BUCKET(table,c); /* set a pointer to the first bucket */ + while( b!=NULL) { + if( b->value==key){ + obj=b->obj; + if(prev==NULL) /* first element */ + BUCKET(table,c)=b->next; + else + prev->next=b->next; + free(b); + table->n_entries--; + return obj; + } + prev = b; + b = b->next; + }; + return NULL; +} + +__ptr_t replace_object(hashtable table,ulong key,__ptr_t newobj) +{ + __ptr_t old; + hashnode *b=hash_lookup(table,key); + + if(b==NULL)return NULL; + old=b->obj; + b->obj=newobj; + return old; +} + +/* looks a 'bucket' in the hashing table whith 'key' and return the + pointer to the object stored in that bucket or NULL if no bucket is found */ +__ptr_t get_object(hashtable table,ulong key){ + + hashnode *b=hash_lookup(table,key); + if(b==NULL) + return NULL; + return b->obj; +} + +/* Allocates space to a new hash table */ +hashtable new_hashtable(ulong hashsize) { + hashtable new; + register int i; + + if( (new = (hashtable)malloc(sizeof(struct hashtable_s)))==NULL) return NULL; + + if( (new->buckets = (hashnode**)malloc(sizeof(hashnode*)*hashsize))==NULL) + return NULL; + new->size=hashsize; + new->last_bucket=0; + new->last_node=NULL; + new->n_entries=0; + for(i=0;inext = BUCKET(table,ind); + new->value=key; + new->obj=obj; + BUCKET(table,ind)=new; + table->n_entries++; + return 1; +} + +void free_hashtable(hashtable table) +{ + register int i; + hashnode *n,*tmp; + //fprintf(stderr,"free_hashtable\n");fflush(stderr); + if (table==NULL) return; + for(i=0;inext; + free(tmp); + } + } + free(table->buckets); + free(table); +} +/*********************************************************************************/ +/* + * Returns all objects stored in a basket by making successive calls + */ +void init_hash_traversal(hashtable table) { + table->last_bucket=0; + table->last_node=NULL; +} +/* + * Returns all objects stored in a basket by making successive calls + */ +__ptr_t next_hash_object(hashtable table) +{ + // first time.... + if( table->last_bucket>=HASHSIZE(table)) + return NULL; + + if( table->last_node==NULL ) { + // find bucket + // find next bucket + while ( table->last_node == NULL && table->last_bucket+1last_bucket; + table->last_node = BUCKET(table,table->last_bucket); + } + if (table->last_node==NULL) + return NULL; + return table->last_node->obj; + } + // Next in bucket + table->last_node=table->last_node->next; + if (table->last_node==NULL) return next_hash_object(table); + return table->last_node->obj; +} + +/* + * Returns all hash nodes stored in a basket by making successive calls + */ +__ptr_t next_hashnode(hashtable table) +{ + // first time.... + if( table->last_bucket>=HASHSIZE(table)) + return NULL; + + if( table->last_node==NULL ) { + // find bucket + // find next bucket + while ( table->last_node == NULL && table->last_bucket+1last_bucket; + table->last_node = BUCKET(table,table->last_bucket); + } + if (table->last_node==NULL) + return NULL; + return table->last_node; + } + // Next in bucket + table->last_node=table->last_node->next; + if (table->last_node==NULL) return next_hashnode(table); + return table->last_node; +} + diff --git a/library/lammpi/hash.h b/library/lammpi/hash.h new file mode 100644 index 000000000..821cdcbe7 --- /dev/null +++ b/library/lammpi/hash.h @@ -0,0 +1,60 @@ +/*===========================================================================================% + * Copyright (c) 2002,2003,2004,2005,2006 Nuno Fonseca. All rights reserved. + * This code is freely available for academic purposes. + * If you intend to use it for commercial purposes then please contact the author first. + * + * Author: Nuno Fonseca + * Date: 2002-07-10 + * $Id: hash.h,v 1.1 2006-06-02 04:16:31 nunofonseca Exp $ + * + *===========================================================================================*/ +#ifndef HASH +#define HASH +#include +#if defined (__cplusplus) || (defined (__STDC__) && __STDC__) +#define __ptr_t void * +#else /* Not C++ or ANSI C. */ +#define __ptr_t char * +#endif /* C++ or ANSI C. */ + +#ifndef ulong +#define ulong unsigned long int +#endif + +#ifndef NULL +#define NULL 0 +#endif + + +struct bucket { + struct bucket *next; + ulong value; /* Value >=0 used as key in the hashing*/ + __ptr_t obj; /* pointer to a object*/ +}; +typedef struct bucket hashnode; + + +struct hashtable_s { + hashnode **buckets; // + ulong size; // number of buckets + ulong last_bucket; // used in searchs/ hash traversals + ulong n_entries; // number of entries in the hashtable + hashnode* last_node; +}; + +//typedef hashnode **hashtable; +typedef struct hashtable_s* hashtable; + +/* functions */ +hashtable new_hashtable(ulong hashsize); +__ptr_t get_next_object(hashtable,ulong); +__ptr_t delete(hashtable,ulong); +__ptr_t replace_object(hashtable,ulong,__ptr_t); +__ptr_t get_object(hashtable,ulong); +int insere(hashtable,ulong,__ptr_t); +void free_hashtable(hashtable); + +void init_hash_traversal(hashtable table); +__ptr_t next_hash_object(hashtable table); +__ptr_t next_hashnode(hashtable table); +#endif diff --git a/library/lammpi/prologterms2c.c b/library/lammpi/prologterms2c.c new file mode 100644 index 000000000..133f81f2a --- /dev/null +++ b/library/lammpi/prologterms2c.c @@ -0,0 +1,227 @@ +/************************************************************************ + * Copyright (c) 2004 Nuno Fonseca. All rights reserved. + * This code is freely available for academic purposes. + * If you intend to use it for commercial purposes then please contact the author first. + * + * Author: Nuno Fonseca + * File: prologterms2c.c + * Last rev: $Id: prologterms2c.c,v 1.1 2006-06-02 04:16:31 nunofonseca Exp $ + * Comments: This file provides a set of functions to convert a prolog term to a C string and back. + *************************************************************************/ +#include "prologterms2c.h" +#include +#include +#include +#include +#include + +#ifdef COMPRESS +#include "minilzo.h" +#endif + +#ifndef Quote_illegal_f +#define Quote_illegal_f 1 +#define Ignore_ops_f 2 +#define Handle_vars_f 4 +#define Use_portray_f 8 +#define To_heap_f 16 +#endif + + +#ifdef COMPRESS + +#endif + +struct buffer_ds buffer; +extern char *Yap_ErrorMessage; + +/*********************************************************************************************/ +// prototypes +void write_msg(const char *fun,const char *file, int line,const char *format, ...); + +size_t write_term_to_stream(const int fd,const YAP_Term term); +YAP_Term read_term_from_stream(const int fd); +/*********************************************************************************************/ +/* + * Writes a debug message containing the processid, function name, filename, line, and a user message + */ +void +write_msg(const char *fun,const char *file, int line, + const char *format, ...) { + va_list ap; + + va_start(ap, format); + /* Print the message to stderr */ + fprintf(stderr, + "[%d:%s in %s at line %d] ", getpid(),fun, file, line); + vfprintf(stderr, format, ap); + va_end(ap); +} +/********************************************************************************************* + * Memory handling functions + *********************************************************************************************/ +/* + * Adds 'space' to the size of the currently allocated buffer + */ +static void +expand_buffer(const size_t space ) { + char *oldblock; + + // BUFFER_PTR = realloc( BUFFER_PTR, BUFFER_SIZE + space ); + oldblock= BUFFER_PTR; + BUFFER_PTR = (char*)malloc( BUFFER_SIZE + space ); + if( BUFFER_PTR == NULL ) { + YAP_Error(0,0,"Prolog2Term: Out of memory.\n"); +#ifdef MPI + MPI_Finalize(); +#endif + YAP_Exit( 1 ); + } + memcpy(BUFFER_PTR,oldblock,BUFFER_SIZE); + + if(oldblock!=NULL) + free(oldblock); + + BUFFER_SIZE+=space; +} +/* + * Changes the size of the buffer to contain at least newsize bytes + */ +void +change_buffer_size(const size_t newsize) { + + if ( BUFFER_SIZE>=BLOCK_SIZE && BUFFER_SIZE>newsize) + return; + if(BUFFER_PTR!=NULL) + free(BUFFER_PTR); + BUFFER_PTR = (char*)malloc(newsize); + if( BUFFER_PTR == NULL ) { + YAP_Error(0,0,"Prolog2Term: Out of memory.\n"); +#ifdef MPI + MPI_Finalize(); +#endif + YAP_Exit( 1 ); + } + BUFFER_SIZE=newsize; +} +/********************************************************************************************* + * I/O functions + *********************************************************************************************/ +/* + * Function used by YAP to write a char to a string + */ +static void +p2c_putc(const int c) { + // if( buffer.size==buffer.len+1 ) + if( BUFFER_SIZE==BUFFER_LEN ) { +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"p2c_putc:buffer expanded: size=%u pos=%u len=%u\n",BUFFER_SIZE,BUFFER_POS,BUFFER_LEN); +#endif + expand_buffer( BLOCK_SIZE ); + } + BUFFER_PTR[BUFFER_LEN++] = c; +} +/* + * Function used by YAP to read a char from a string + */ +static int +p2c_getc(void) { + if( BUFFER_POS < BUFFER_LEN ) + return BUFFER_PTR[BUFFER_POS++]; + return -1; +} +/* + * Writes a term to a stream. + */ +size_t +write_term_to_stream(const int fd,const YAP_Term term) { + + RESET_BUFFER; + + YAP_Write( term, p2c_putc,3); // 3=canonical + write(fd,(void*)&BUFFER_LEN,sizeof(size_t));// write size of term + write(fd,(void*)BUFFER_PTR,BUFFER_LEN); // write term + return BUFFER_LEN; +} +/* + * Read a prolog term from a stream + * (the prolog term must have been writen by the write_term_to_stream) + */ +YAP_Term +read_term_from_stream(const int fd) { + size_t size; + + RESET_BUFFER; + read(fd,(void*)&size,sizeof(size_t)); // read the size of the term +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"read_term_from_stream>>>>size:%d\n",size); +#endif + if ( size> BUFFER_SIZE) + expand_buffer(size-BUFFER_SIZE); + read(fd,BUFFER_PTR,size); // read term from stream + return YAP_Read( p2c_getc ); +} +/********************************************************************************************* + * Conversion: Prolog Term->char[] and char[]->Prolog Term + *********************************************************************************************/ +/* + * Converts a term t into a string. + * The ascii representation of t is + * copied to ptr if it occupies less than size. + */ +char* +term2string(char *const ptr,size_t *size, const YAP_Term t) { + char *ret; + + RESET_BUFFER; + + YAP_Write( t, p2c_putc, 3 );// canonical + p2c_putc('\0'); //add terminator + + if (BUFFER_LEN<=*size) { // user allocated buffer size is ok + memcpy(ptr,BUFFER_PTR,BUFFER_LEN); // copy data to user block + ret=ptr; + *size=BUFFER_LEN; + } else { // user buffer is too small + ret=BUFFER_PTR; + *size=BUFFER_LEN; + //DEL_BUFFER; + } + return ret; +} +/* + * Converts a string with a ascci representation of a term into a Prolog term. + */ +YAP_Term +string2term(char *const ptr,const size_t *size) { + YAP_Term t; + struct buffer_ds b; + + if (BUFFER_PTR!=ptr) { // +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"copy buffer string2term\n"); +#endif + COPY_BUFFER_DS(buffer,b); // keep a copy of buffer_ds + BUFFER_PTR=ptr; // make the buffer use the buffer provided + BUFFER_LEN=*size; + BUFFER_SIZE=BUFFER_LEN; + } else { // b aux. struct. not needed + b.ptr=NULL; + } + BUFFER_POS=0; + Yap_ErrorMessage=NULL; + t = YAP_Read(p2c_getc); + if ( t==FALSE ) { + write_msg(__FUNCTION__,__FILE__,__LINE__,"FAILED string2term>>>>size:%d %d %s\n",BUFFER_SIZE,strlen(BUFFER_PTR),Yap_ErrorMessage); + exit(1); + } + + if (b.ptr!=NULL) + COPY_BUFFER_DS(b,buffer); + +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"ending: buffer(ptr=%p;size=%d;pos=%d;len=%d)\n",BUFFER_PTR,BUFFER_SIZE,BUFFER_POS,BUFFER_LEN); +#endif + + return t; +} diff --git a/library/lammpi/prologterms2c.h b/library/lammpi/prologterms2c.h new file mode 100644 index 000000000..acf75a4c8 --- /dev/null +++ b/library/lammpi/prologterms2c.h @@ -0,0 +1,85 @@ +/************************************************************************ + * Copyright (c) 2004,2005,2006 Nuno Fonseca. All rights reserved. + * This code is freely available for academic purposes. + * If you intend to use it for commercial purposes then please contact the author first. + * + * Author: Nuno Fonseca + * File: prologterms2c.c + * Last rev: $Id: prologterms2c.h,v 1.1 2006-06-02 04:16:31 nunofonseca Exp $ + * Comments: This file provides a set of functions to convert a prolog term to a C string and back. + *************************************************************************/ + +#ifndef PROLOGTERMS2C +#define PROLOGTERMS2C 1 + +#ifndef _yap_c_interface_h +#include +//#include +#endif + +#ifndef size_t +#include +#endif +#include +/* + * Converts a term t into a string. + * The ascii representation of t is + * copied to ptr if it occupies less than size. Otherwise the + * necessary memory is aloccated (dyn_ptr) and the ascii + * representation of the term is copied to there. + */ +char* term2string(char *const ptr,size_t *size, const YAP_Term t); +/* + * Converts a string with a ascci representation of a term into a term. + * The ascii representation of t is + * copied to ptr if it occupies less than *size. Otherwise the + * necessary memory is aloccated and the ascii + * representation of the term is copied to there. + */ +YAP_Term string2term(char *const ptr,const size_t *size); +/* + * Read a prolog term from a stream + * (the prolog term must have been writen by the write_term_to_stream) + */ +YAP_Term read_term_from_stream(const int fd); +/* + * Writes a term to a stream. + */ +size_t write_term_to_stream(const int fd,const YAP_Term term); +/* + * Changes the size of the buffer to contain at least newsize bytes. + * Useful to reduce the number of reallocs,mallocs, and frees + */ +void change_buffer_size(const size_t newsize); + +void write_msg(const char *fun,const char *file, int line,const char *format, ...); +/********************************************************************************************* + * Macros to manipulate the buffer + *********************************************************************************************/ +#define BLOCK_SIZE 4*1024 + +// deletes the buffer (all fields) but does not release the memory of the buffer.ptr +#define DEL_BUFFER {buffer.ptr=NULL;buffer.size=0;buffer.len=0;} +// informs the prologterm2c module that the buffer is now used and should not be messed +#define USED_BUFFER() DEL_BUFFER +// initialize buffer +#define RESET_BUFFER {buffer.len=0;change_buffer_size(BLOCK_SIZE);buffer.pos=0;} +#define BUFFER_PTR buffer.ptr +#define BUFFER_SIZE buffer.size +#define BUFFER_LEN buffer.len +#define BUFFER_POS buffer.pos +// copies two buffers +#define COPY_BUFFER_DS(src,dst) {dst.size=src.size;dst.len=src.len;dst.ptr=src.ptr;dst.pos=src.pos;} + +/********************************************************************************************* + * Buffer + *********************************************************************************************/ +struct buffer_ds { + size_t size, // size of the buffer + len; // size of the string + char *ptr; // pointer to the buffer + size_t pos; // position (used while reading) +}; +extern struct buffer_ds buffer; + +#endif diff --git a/library/lammpi/yap_mpi.c b/library/lammpi/yap_mpi.c new file mode 100644 index 000000000..e044e0c0c --- /dev/null +++ b/library/lammpi/yap_mpi.c @@ -0,0 +1,1028 @@ +/******************************************************************************************* + * + * Copyright (c) 2004 Nuno Fonseca. All rights reserved. + * This code is freely available for academic purposes. + * If you intend to use it for commercial purposes then please contact the author first. + * Author: Nuno Fonseca + * Date: 2004-05-02 + * YAP interface to LAM/MPI + * $Id: yap_mpi.c,v 1.1 2006-06-02 04:16:31 nunofonseca Exp $ + * + ******************************************************************************************/ +#include +#include +#include +#include +#include +#include + +#include + +#include "prologterms2c.h" + +#ifndef _yap_c_interface_h +#include +#endif + +#include "hash.h" + +/*********************************************************************/ +struct broadcast_req { + void *ptr; // pointer to an allocated memory buffer associated to the broadcast + int nreq; // number of requests associated to the broadcast +}; +typedef struct broadcast_req BroadcastRequest; + +/*********************************************************************/ +#define IDTYPE long +#define HANDLE2INT(ptr) (IDTYPE)ptr +#define INT2HANDLE(id) (MPI_Request*)id + +#define BREQ2INT(ptr) (IDTYPE)ptr +#define INT2BREQ(ptr) (BroadcastRequest*)ptr + +#define MPI_CALL(function) ((mpi_status=function)==MPI_SUCCESS?MPI_SUCCESS:mpi_error(mpi_status)) + +#ifdef USE_THREADS +#include +//int pthread_create(pthread_t * thread, pthread_attr_t * attr, void * +// (*start_routine)(void *), void * arg); +//pthread_exit() +#endif + +/******************************************************************** + * Auxiliary data + ********************************************************************/ +static int mpi_status; +extern char **Yap_argv; +extern int Yap_argc; + +#define HASHSIZE 1777 +static hashtable requests=NULL; +static hashtable broadcasts=NULL; + +/******************************************************************** + * Time accounting + ********************************************************************/ +#ifdef MPISTATS + +#include +#include + +/* Statistics */ +static unsigned long bytes_sent; // bytes received (mpi headers are ignored) +static unsigned long bytes_recv; // bytes received +static unsigned long num_msgs_sent; // number of messages sent +static unsigned long num_msgs_recv; // number of messages received +static unsigned long max_s_recv_msg;// maximum size of a message received +static unsigned long max_s_sent_msg;// maximum size of a message sent +static double total_time_spent; // total time spend in communication code + +/* MSG ACCOUNTING */ +#define RESET_STATS() {total_time_spent=0;bytes_sent=bytes_recv=num_msgs_recv=num_msgs_sent=max_s_recv_msg= max_s_sent_msg=0;} +#define MSG_SENT(size) {bytes_sent+=size;++num_msgs_sent;if(max_s_sent_msg +#include +#include + +void tstart(void) { + struct rusage r; + getrusage(RUSAGE_SELF,&r); + _tstart=r.ru_utime; +} +void tend(void) { + struct rusage r; + getrusage(RUSAGE_SELF,&r); + _tend=r.ru_utime; +} +// +double tval(){ + double t1, t2,elapsed; + t1 = (double)_tstart.tv_sec + (double)_tstart.tv_usec/(1000*1000); + t2 = (double)_tend.tv_sec + (double)_tend.tv_usec/(1000*1000); + elapsed=t2-t1; + if (elapsed==0) return 0.000001; + return elapsed; +} +/* + * returns the statistics + */ +static int mpi_stats(void){ + fprintf(stderr,"%f %ld %ld %ld %ld %ld %ld\n",MPITIME,num_msgs_recv,bytes_recv,max_s_recv_msg,num_msgs_sent,bytes_sent,max_s_sent_msg); + return (YAP_Unify(YAP_ARG1, YAP_MkFloatTerm((float)(MPITIME))) && + YAP_Unify(YAP_ARG2, YAP_MkIntTerm((long)num_msgs_recv)) && + YAP_Unify(YAP_ARG3, YAP_MkIntTerm((long)bytes_recv)) && + YAP_Unify(YAP_ARG4, YAP_MkIntTerm((long)max_s_recv_msg)) && + YAP_Unify(YAP_ARG5, YAP_MkIntTerm((long)num_msgs_sent)) && + YAP_Unify(YAP_ARG6, YAP_MkIntTerm((long)bytes_sent)) && + YAP_Unify(YAP_ARG7, YAP_MkIntTerm((long)max_s_sent_msg)) + ); +} +/* + * + */ +static int mpi_reset_stats(void) {RESET_STATS(); return (TRUE);} +#else + +#define PAUSE_TIMER() +#define CONT_TIMER() + + +#define RESET_STATS() +#define MSG_SENT(size) +#define MSG_RECV(size) +#define RETURN(p) {return (p);} +#endif + +/******************************************************************** + * Functions to store/fetch/delete requests + ********************************************************************/ + +static inline int +new_request(MPI_Request *handle,void* ptr) { + return insere(requests,(ulong)HANDLE2INT(handle),ptr); +} + +static inline void* +get_request(MPI_Request *handle) { + return get_object(requests,(ulong)HANDLE2INT(handle)); +} + +static inline void +free_request(MPI_Request *handle) { + void* ptr; + ptr=delete(requests,(ulong)HANDLE2INT(handle)); + free(ptr); + free(handle); +} +/******************************************************************** + * Functions to store/fetch/delete broadcast requests + ********************************************************************/ +/* + * Returns a new BroadcastRequest object + */ +static inline BroadcastRequest* +new_broadcast(void) { + BroadcastRequest* b=(BroadcastRequest *)malloc(sizeof(BroadcastRequest)); + if ( b!=NULL) { + b->ptr=NULL; + b->nreq=0; + } + // write_msg(__FUNCTION__,__FILE__,__LINE__,"new broadcast: %p\n",b); + return b; +} +/* + * + */ +static inline void +free_broadcast_request(MPI_Request *handle) { + BroadcastRequest* b; + b=(BroadcastRequest*)delete(broadcasts,(ulong)BREQ2INT(handle));// get the ptr to broadcast object + b->nreq--; + if ( !b->nreq ) { + // all requests received + free(b->ptr); + free(b); + } + // write_msg(__FUNCTION__,__FILE__,__LINE__,"free broadcast_request: %p->%p\n",b,handle); + free(handle); +} +/* + * + */ +static inline void* +get_broadcast_request(MPI_Request *handle) { + return get_object(broadcasts,(ulong)HANDLE2INT(handle)); +} +/* + * + */ +static inline int +new_broadcast_request(BroadcastRequest* b,MPI_Request *handle,void* ptr) { + b->ptr=ptr; + b->nreq++; + //write_msg(__FUNCTION__,__FILE__,__LINE__,"new broadcast_request: %p->%p\n",b,handle); + return insere(broadcasts,(ulong)HANDLE2INT(handle),b); +} + +/*********************************************************************/ +static int mpi_error(int errcode){ + char err_msg[MPI_MAX_ERROR_STRING]; + int len; + + MPI_Error_string(errcode,&err_msg[0],&len); + err_msg[len]='\0'; +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_Error: %s\n",err_msg); +#endif + return errcode; +} +/******************************************************************** + + ********************************************************************/ +/* + * Sets up the mpi enviromment. This function should be called before any other MPI + * function. + */ +static int +mpi_init(void){ + int thread_level; + // MPI_Init(&Yap_argc, &Yap_argv); + MPI_Init_thread(&Yap_argc, &Yap_argv,MPI_THREAD_SINGLE,&thread_level); +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"Thread level: %d\n",thread_level); +#endif +#ifdef MPISTATS + RESET_STATS(); +#endif + return (TRUE); +} + +#ifdef USE_THREADS +/* + * Sets up the mpi enviromment. This function should be called before any other MPI + * function. + * the argument is the name of the predicate that will be invoked when a message is received + */ +static int +rcv_msg_thread(char *handle_pred) { + YAP_Term pred=YAP_MkAtomTerm(YAP_LookupAtom(handle_pred)); + MPI_Status status; + + while(1) { + write_msg(__FUNCTION__,__FILE__,__LINE__,"Waiting for MPI msg\n"); + if( MPI_CALL(MPI_Probe( MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status )) == MPI_SUCCESS ) { + // call handle + write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI Msg received\n"); + YAP_CallProlog(pred); + } else + write_msg(__FUNCTION__,__FILE__,__LINE__,"Error in MPI_Probe\n"); + } + return 1; +} +/* + * + */ +static int +mpi_init_rcv_thread(void){ + int thread_level; + // MPI_Init(&Yap_argc, &Yap_argv); + pthread_t thread; + char *arg="handle_msg"; + + MPI_Init_thread(&Yap_argc, &Yap_argv,MPI_THREAD_SINGLE,&thread_level); + if(pthread_create(&thread,NULL,(void*)&rcv_msg_thread,arg)) { + return (FALSE); + } + + pthread_detach(thread); + write_msg(__FUNCTION__,__FILE__,__LINE__,"Thread level: %d\n",thread_level); + return (TRUE); +} +#endif + +/* + *Terminates the MPI execution enviroment. Every process must call this function before + * exiting. + * mpi_comm_finalize. + */ +static int +mpi_finalize(void){ + return (MPI_Finalize()==MPI_SUCCESS?TRUE:FALSE); +} +/* + * Returns the number of workers associated to the MPI_COMM_WORLD communicator. + * mpi_comm_size(-Size). + */ +static int +mpi_comm_size(void){ + int size; + MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size)); + return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(size))); +} +/* + * Returns the rank of the current process. + * mpi_comm_rank(-Rank). + */ +static int +mpi_comm_rank(void){ + int rank; + MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank)); + return(YAP_Unify(YAP_ARG1,YAP_MkIntTerm(rank))); +} +/* + * Returns the major and minor version of MPI. + * mpi_version(-Major,-Minor). + */ +static int +mpi_version(void){ + int major,minor; + + MPI_CALL(MPI_Get_version(&major,&minor)); + return (YAP_Unify(YAP_ARG1,YAP_MkIntTerm(major)) && YAP_Unify(YAP_ARG2,YAP_MkIntTerm(minor))); +} +/* + * + * + */ +static int +mpi_get_processor_name(void) { + char name[MPI_MAX_PROCESSOR_NAME]; + int length; + MPI_CALL(MPI_Get_processor_name(name,&length)); + return (YAP_Unify(YAP_ARG1,YAP_MkAtomTerm(YAP_LookupAtom(name)))); +} +/* + * Non blocking communication function. The message is sent when possible. To check for the status of the message, + * the mpi_wait and mpi_test should be used. Until mpi_wait is called, the memory allocated for the buffer containing + * the message is not released. + * + * mpi_isend(+Data, +Destination, +Tag, -Handle). + */ +static int +mpi_isend(void) { + YAP_Term t1 = YAP_Deref(YAP_ARG1), + t2 = YAP_Deref(YAP_ARG2), + t3 = YAP_Deref(YAP_ARG3), + t4 = YAP_Deref(YAP_ARG4); + char *str=NULL; + int dest,tag; + size_t len=0; + MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request)); + + CONT_TIMER(); + if ( handle==NULL ) return (FALSE); + + if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3) || !YAP_IsVarTerm(t4)) { + PAUSE_TIMER(); + RETURN(FALSE); + } + // + dest = YAP_IntOfTerm(t2); + tag = YAP_IntOfTerm(t3); + // + str=term2string(NULL,&len,t1); + MSG_SENT(len); + // send the data + if( MPI_CALL(MPI_Isend( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD ,handle)) != MPI_SUCCESS ) { + PAUSE_TIMER(); + RETURN(FALSE); + } + +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag); +#endif + // We must associate the string to each handle + new_request(handle,str); + USED_BUFFER(); // informs the prologterm2c module that the buffer is now used and should not be messed + PAUSE_TIMER(); + RETURN(YAP_Unify(t4,YAP_MkIntTerm(HANDLE2INT(handle))));// it should always succeed +} +/* + * Blocking communication function. The message is sent immediatly. + * mpi_send(+Data, +Destination, +Tag). + */ +static int +mpi_send(void) { + + YAP_Term t1 = YAP_Deref(YAP_ARG1), + t2 = YAP_Deref(YAP_ARG2), + t3 = YAP_Deref(YAP_ARG3); + char *str=NULL; + int dest,tag; + size_t len=0; + int val; + if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3)) { + return (FALSE); + } + + CONT_TIMER(); + // + dest = YAP_IntOfTerm(t2); + tag = YAP_IntOfTerm(t3); + // the data is packaged as a string + str=term2string(NULL,&len,t1); +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag); +#endif + // send the data + val=(MPI_CALL(MPI_Send( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD))==MPI_SUCCESS?TRUE:FALSE); + + PAUSE_TIMER(); + RETURN(val); +} +/* + * Implements a blocking receive operation. + * mpi_recv(?Source,?Tag,-Data). + */ +static int +mpi_recv(void) { + YAP_Term t1 = YAP_Deref(YAP_ARG1), + t2 = YAP_Deref(YAP_ARG2), + t3 = YAP_Deref(YAP_ARG3), + t4; + int tag, orig; + int len=0; + MPI_Status status; + + //The third argument (data) must be unbound + if(!YAP_IsVarTerm(t3)) { + return FALSE; + } + /* The first argument (Source) must be bound to an integer + (the rank of the source) or left unbound (i.e. any source + is OK) */ + if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE; + else if( !YAP_IsIntTerm(t1) ) return (FALSE); + else orig = YAP_IntOfTerm(t1); + + /* The second argument must be bound to an integer (the tag) + or left unbound (i.e. any tag is OK) */ + if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG; + else if( !YAP_IsIntTerm(t2) ) return (FALSE); + else tag = YAP_IntOfTerm( t2 ); + + CONT_TIMER(); + // probe for term' size + if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS) { + PAUSE_TIMER(); + RETURN(FALSE); + } + if( MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &len )) != MPI_SUCCESS || + status.MPI_TAG==MPI_UNDEFINED || + status.MPI_SOURCE==MPI_UNDEFINED) { + PAUSE_TIMER(); + RETURN(FALSE); + } + //realloc memory buffer + change_buffer_size(len+1); + BUFFER_LEN=len; + // Already know the source from MPI_Probe() + if( orig == MPI_ANY_SOURCE ) { + orig = status.MPI_SOURCE; + if( !YAP_Unify(t1, YAP_MkIntTerm(orig))) { + PAUSE_TIMER(); + RETURN(FALSE); + } + } + // Already know the tag from MPI_Probe() + if( tag == MPI_ANY_TAG ) { + tag = status.MPI_TAG; + if( !YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))) { + PAUSE_TIMER(); + RETURN(FALSE); + } + } + // Receive the message as a string + if( MPI_CALL(MPI_Recv( BUFFER_PTR, BUFFER_LEN, MPI_CHAR, orig, tag, + MPI_COMM_WORLD, &status )) != MPI_SUCCESS ) { + /* Getting in here should never happen; it means that the first + package (containing size) was sent properly, but there was a glitch with + the actual content! */ + PAUSE_TIMER(); + RETURN(FALSE); + } +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,BUFFER_PTR, BUFFER_LEN, orig, tag); +#endif + MSG_RECV(BUFFER_LEN); + t4=string2term(BUFFER_PTR,&BUFFER_LEN); + PAUSE_TIMER(); + RETURN(YAP_Unify(t3,t4)); +} +/* + * Implements a non-blocking receive operation. + * mpi_irecv(?Source,?Tag,-Handle). + */ +static int +mpi_irecv(void) { + YAP_Term t1 = YAP_Deref(YAP_ARG1), + t2 = YAP_Deref(YAP_ARG2), + t3 = YAP_Deref(YAP_ARG3); + int tag, orig; + int len=0; + MPI_Status status; + MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request)); + + // The third argument (data) must be unbound + if(!YAP_IsVarTerm(t3)) { + //Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_receive"); + return FALSE; + } + /* The first argument (Source) must be bound to an integer + (the rank of the source) or left unbound (i.e. any source + is OK) */ + if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE; + else if( !YAP_IsIntTerm(t1) ) return (FALSE); + else orig = YAP_IntOfTerm(t1); + + /* The third argument must be bound to an integer (the tag) + or left unbound (i.e. any tag is OK) */ + if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG; + else if( !YAP_IsIntTerm(t2) ) return (FALSE); + else tag = YAP_IntOfTerm( t2 ); + + CONT_TIMER(); + + // probe for the size of the term + if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS ){ + PAUSE_TIMER(); + RETURN(FALSE); + } + // + MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &len )); + change_buffer_size(len+1); + BUFFER_LEN=len; + // Already know the source from MPI_Probe() + if( orig == MPI_ANY_SOURCE ) { + orig = status.MPI_SOURCE; + if ( YAP_Unify(t1, YAP_MkIntTerm(orig))!=TRUE ) { + PAUSE_TIMER(); + RETURN(FALSE); + } + } + // Already know the tag from MPI_Probe() + if( tag == MPI_ANY_TAG ) { + tag = status.MPI_TAG; + if (YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))!=TRUE) { + PAUSE_TIMER(); + RETURN(FALSE); + } + } + + if( MPI_CALL(MPI_Irecv( BUFFER_PTR, BUFFER_LEN, MPI_CHAR, orig, tag, + MPI_COMM_WORLD, handle )) != MPI_SUCCESS ) { + PAUSE_TIMER(); + RETURN(FALSE); + } + new_request(handle,BUFFER_PTR); + DEL_BUFFER; // force the realocation of a new memory block + PAUSE_TIMER(); + RETURN(YAP_Unify(t3,YAP_MkIntTerm(HANDLE2INT(handle)))); +} +/* + * Completes a non-blocking operation. IF the operation was a send, the + * function waits until the message is buffered or sent by the runtime + * system. At this point the send buffer is released. If the operation + * was a receive, it waits until the message is copied to the receive + * buffer. + * mpi_wait(+Handle,-Status). +*/ +static int +mpi_wait(void) { + YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle + t2 = YAP_Deref(YAP_ARG2); // Status + MPI_Status status; + MPI_Request *handle; + + // The first argument must be an integer (an handle) + if(!YAP_IsIntTerm(t1)) { + return FALSE; + } + handle=INT2HANDLE(YAP_IntOfTerm(t1)); + CONT_TIMER(); + // probe for term' size + if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS ) { + PAUSE_TIMER(); + RETURN(FALSE); + } + free_request(handle); + PAUSE_TIMER(); + RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR))); +} +/* + * Provides information regarding a handle, ie. if a communication operation has been completed. + * If the operation has been completed the predicate succeeds with the completion status, + * otherwise it fails. + * mpi_test(+Handle,-Status). +*/ +static int +mpi_test(void) { + YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle + t2 = YAP_Deref(YAP_ARG2); // Status + MPI_Status status; + MPI_Request *handle; + int flag; + + // The first argument (handle) must be an integer + if(!YAP_IsIntTerm(t1)) { + return FALSE; + } + CONT_TIMER(); + + handle=INT2HANDLE(YAP_IntOfTerm(t1)); + // + MPI_CALL(MPI_Test( handle , &flag, &status )); + if( flag != TRUE ) { + PAUSE_TIMER(); + RETURN(FALSE); + } + free_request(handle); + PAUSE_TIMER(); + RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR))); +} +/* + * Completes a non-blocking operation. IF the operation was a send, the + * function waits until the message is buffered or sent by the runtime + * system. At this point the send buffer is released. If the operation + * was a receive, it waits until the message is copied to the receive + * buffer. + * mpi_wait(+Handle,-Status,-Data). + */ +static int +mpi_wait_recv(void) { + YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle + t2 = YAP_Deref(YAP_ARG2), // Status + t3 = YAP_Deref(YAP_ARG3); // data + MPI_Status status; + MPI_Request *handle; + char *s; + int len,ret; + + // The first argument (handle) must be an integer + if(!YAP_IsIntTerm(t1)) { + return FALSE; + } + CONT_TIMER(); + + handle=INT2HANDLE(YAP_IntOfTerm(t1)); + // wait for communication completion + if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS) { + PAUSE_TIMER(); + RETURN(FALSE); + } + s=(char*)get_request(handle); + len=strlen(s); + ret=YAP_Unify(t3,string2term(s,(size_t*)&len)); + MSG_RECV(len); + free_request(handle); + PAUSE_TIMER(); + RETURN(ret & YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR))); +} +/* + * Provides information regarding a handle, ie. if a communication operation has been completed. + * If the operation has been completed the predicate succeeds with the completion status, + * otherwise it fails. + * + * mpi_test(+Handle,-Status,-Data). + */ +static int +mpi_test_recv(void) { + YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle + t2 = YAP_Deref(YAP_ARG2), // Status + t3 = YAP_Deref(YAP_ARG3); // data + + MPI_Status status; + MPI_Request *handle; + int flag,len,ret; + char *s; + + // The first argument (handle) must be an integer + if(!YAP_IsIntTerm(t1)) { + return FALSE; + } + CONT_TIMER(); + + handle=INT2HANDLE(YAP_IntOfTerm(t1)); + // + if( MPI_CALL(MPI_Test( handle , &flag, &status ))!=MPI_SUCCESS) { + PAUSE_TIMER(); + RETURN(FALSE); + } + s=(char*)get_request(handle); + len=strlen(s); + ret=YAP_Unify(t3,string2term(s,(size_t*)&len)); + free_request(handle); + PAUSE_TIMER(); + RETURN(ret & YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR))); +} +/* + * Collective communication function that performs a barrier synchronization among all processes. + * mpi_barrier + */ +static int +mpi_barrier(void) { + CONT_TIMER(); + int ret=MPI_CALL(MPI_Barrier(MPI_COMM_WORLD)); + PAUSE_TIMER(); + return (ret==MPI_SUCCESS?TRUE:FALSE); +} +/*********************************** + * Broadcast + ***********************************/ +/* + * Broadcasts a message from the process with rank "root" to + * all other processes of the group. + * Note: Collective communication means all processes within a communicator call the same routine. + * To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2 + * + * mpi_bcast(+Root,+Data). + */ +static int +mpi_bcast(void) { + YAP_Term t1 = YAP_Deref(YAP_ARG1), + t2 = YAP_Deref(YAP_ARG2); + int root,val; + size_t len=0; + char *str; + //The arguments should be bound + if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1)) { + return FALSE; + } + + CONT_TIMER(); + root = YAP_IntOfTerm(t1); + str=term2string(NULL,&len,t2); +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root); +#endif + // send the data + val=(MPI_CALL(MPI_Bcast( str, len, MPI_CHAR, root, MPI_COMM_WORLD))==MPI_SUCCESS?TRUE:FALSE); + +#ifdef MPISTATS + { + int size; + MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size)); + MSG_SENT(len*size); + } +#endif + PAUSE_TIMER(); + RETURN(val); +} +/* + * Broadcasts a message from the process with rank "root" to + * all other processes of the group. + * Note: Collective communication means all processes within a communicator call the same routine. + * To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2 + * mpi_bcast_int(+Root,+Data,+Tag). + */ +static int +my_bcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) { + int root; + int k,worldsize; + size_t len=0; + char *str; + int tag; + + //The arguments should be bound + if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) { + return FALSE; + } + + CONT_TIMER(); + + MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD,&worldsize)); + + root = YAP_IntOfTerm(t1); + tag = YAP_IntOfTerm(t3); + str=term2string(NULL,&len,t2); + + for(k=0;k<=worldsize-1;++k) + if(k!=root) { + // Use async send? + MSG_SENT(len); + if(MPI_CALL(MPI_Send( str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD))!=MPI_SUCCESS) { + PAUSE_TIMER(); + return(FALSE); + } +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"bcast2(%s,%u, MPI_CHAR,%d,%d)\n",str,len,k,tag); +#endif + } + PAUSE_TIMER(); + RETURN(TRUE); +} +/* + * mpi_bcast(+Root,+Data). + */ +static int +mpi_bcast2() { + return my_bcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0)); +} +/* + * Broadcasts a message from the process with rank "root" to + * all other processes of the group. + * Note: Collective communication means all processes within a communicator call the same routine. + * To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2 + * + * mpi_bcast(+Root,+Data,+Tag). + */ +static int +mpi_bcast3(void) { + return my_bcast(YAP_ARG1,YAP_ARG2,YAP_ARG3); +} +/* + * Broadcasts a message from the process with rank "root" to + * all other processes of the group. + * mpi_ibcast(+Root,+Data,+Tag). + */ +static int +my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) { + int root; + int k,worldsize; + size_t len=0; + char *str; + int tag; + BroadcastRequest *b; + + //fprintf(stderr,"ibcast1"); + //The arguments should be bound + if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) { + return FALSE; + } + + CONT_TIMER(); + + // fprintf(stderr,"ibcast2"); + MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD,&worldsize)); + + root = YAP_IntOfTerm(t1); + tag = YAP_IntOfTerm(t3); + str = term2string(NULL,&len,t2); + b=new_broadcast(); + if ( b==NULL ) { + PAUSE_TIMER(); + RETURN(FALSE); + } + //fprintf(stderr,"ibcast3"); + for(k=0;k<=worldsize-1;++k) { + if(k!=root) { + MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request)); + MSG_SENT(len); + // Use async send + if(MPI_CALL(MPI_Isend(str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD,handle))!=MPI_SUCCESS) { + free(handle); + PAUSE_TIMER(); + RETURN(FALSE); + } + new_broadcast_request(b,handle,str); + //new_request(handle,str); + USED_BUFFER(); + } + } + if(!b->nreq)//release b if no messages were sent (worldsize==1) + free(b); + +#ifdef DEBUG + { + struct mallinfo s = mallinfo(); + printf("%d: %d=%d/%d\n",getpid(),s.arena,s.uordblks,s.fordblks); //vsc + } +#endif + PAUSE_TIMER(); + //fprintf(stderr,"ibcast4"); + RETURN(TRUE); +} +/* + * Broadcasts a message from the process with rank "root" to + * all other processes of the group. + * To receive the message the recipients use MPI_Recv + * The message is sent using MPI_Isend + * mpi_ibcast(+Root,+Data,+Tag). + */ +static int +mpi_ibcast3(void) { + return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_ARG3); +} +/* + * mpi_ibcast(+Root,+Data). + */ +static int +mpi_ibcast2() { + return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0)); +} +/******************************************* + * Garbage collection + *******************************************/ +/* + * Attempts to release the requests structures used in asynchronous communications + */ +static void +gc(hashtable ht) { + MPI_Request *handle; + hashnode* node; + MPI_Status status; + int flag; + + node=(hashnode*)next_hashnode(ht); + if ( node==NULL) return; + + gc(ht); // start at the end + + handle=INT2HANDLE(node->value); + MPI_CALL(MPI_Test( handle , &flag, &status )); + if ( flag==TRUE) { + MPI_CALL(MPI_Wait(handle,&status)); +#ifdef DEBUG + write_msg(__FUNCTION__,__FILE__,__LINE__,"Released handle...%s\n",(char*)node->obj); +#endif + if (ht==requests) + free_request(handle); + else + free_broadcast_request(handle); + } +} +/* + * + */ +static int +mpi_gc(void) { + //write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc>: requests=%d\n",requests->n_entries); + CONT_TIMER(); + init_hash_traversal(requests); + gc(requests); + init_hash_traversal(broadcasts); + gc(broadcasts); + //write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc<: requests=%d\n",requests->n_entries); + PAUSE_TIMER(); + RETURN(TRUE); +} +/******************************************************************** + * Init + *******************************************************************/ +void +init_mpi(void) { + + requests=new_hashtable(HASHSIZE); + broadcasts=new_hashtable(HASHSIZE); + DEL_BUFFER; + YAP_UserCPredicate( "mpi_init", mpi_init,0); // mpi_init/0 +#ifdef USE_THREADS + YAP_UserCPredicate( "mpi_init_rcv_thread", mpi_init_rcv_thread,1); // mpi_init_rcv_thread(+HandleMsgGoal/1) +#endif + YAP_UserCPredicate( "mpi_finalize", mpi_finalize,0); // mpi_finalize turn + YAP_UserCPredicate( "mpi_comm_size", mpi_comm_size,1); // mpi_comm_size(-Size) + YAP_UserCPredicate( "mpi_comm_rank", mpi_comm_rank,1); // mpi_comm_rank(-Rank) + YAP_UserCPredicate( "mpi_version", mpi_version,2); // mpi_version(-Major,-Minor) + YAP_UserCPredicate( "mpi_get_processor_name", mpi_get_processor_name,1); // mpi_get_processor_name(-Name) + YAP_UserCPredicate( "mpi_send", mpi_send,3); // mpi_send(+Data, +Destination, +Tag). + YAP_UserCPredicate( "mpi_isend",mpi_isend,4); + YAP_UserCPredicate( "mpi_recv", mpi_recv,3); // mpi_recv(?Source,?Tag,-Data). + YAP_UserCPredicate( "mpi_irecv", mpi_irecv,3); // mpi_irecv(?Source,?Tag,-Handle). + YAP_UserCPredicate( "mpi_wait", mpi_wait,2); // mpi_wait(+Handle,-Status). + YAP_UserCPredicate( "mpi_wait_rcv", mpi_wait_recv,3); // mpi_wait_recv(+Handle,-Status,-Data). + YAP_UserCPredicate( "mpi_test", mpi_test,2); // mpi_test(+Handle,-Status). + YAP_UserCPredicate( "mpi_test_rcv", mpi_test_recv,3); // mpi_test(+Handle,-Status,-Data). + YAP_UserCPredicate( "mpi_bcast", mpi_bcast,2); // mpi_bcast(Root,Term) + YAP_UserCPredicate( "mpi_bcast2", mpi_bcast2,2); // mpi_bcast2(Root,Term) + YAP_UserCPredicate( "mpi_bcast2", mpi_bcast3,3); // mpi_bcast2(Root,Term,Tag) + + YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast2,2); // mpi_ibcast(Root,Term) + YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast3,3); // mpi_ibcast(Root,Term,Tag) + YAP_UserCPredicate( "mpi_barrier", mpi_barrier,0); // mpi_barrier/0 + YAP_UserCPredicate( "mpi_gc", mpi_gc,0); // mpi_gc/0 +#ifdef MPISTATS + YAP_UserCPredicate( "mpi_stats", mpi_stats,7); // mpi_stats(-Time,#MsgsRecv,BytesRecv,MaxRecev,#MsgSent,BytesSent,MaxSent) + YAP_UserCPredicate( "mpi_reset_stats", mpi_reset_stats,0); // cleans the timers + RESET_STATS(); +#endif + // YAP_UserCPredicate( "mpi_gather", mpi_gather,0); //mpi_gather(+RootRank,?SendData,?RecvData) + // Each process (root process included) sends the contents of its send buffer to the root process. The root process receives the messages and stores them in rank order. The outcome is as if each of the n processes in the group (including the root process) had executed a call to MPI_Send and the root had executed n calls to MPI_Recv. The receive buffer is ignored for all non-root processes. + // MPI_Scatter +#ifdef DEBUG + fprintf(stderr,"MPI module succesfully loaded."); + fflush(stderr); +#endif +}