/*
Copyright (C) 2004,2005,2006 (Nuno A. Fonseca) <nuno.fonseca@gmail.com>

This program is free software; you can redistribute it and/or 
modify it under the terms of the GNU General Public License 
as published by the Free Software Foundation; either 
version 2 of the License, or (at your option) any later 
version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.


Last rev: $Id: yap_mpi.c,v 1.4 2006-09-28 11:42:51 vsc Exp $
Comments: YAP interface to LAM/MPI
*/
#include "config.h"
#include <stdlib.h>
#include <stdio.h>
#if HAVE_STRING_H
#include <string.h>
#endif
#if HAVE_MALLOC_H
#include <malloc.h>
#endif
#if HAVE_UNISTD_H
#include <unistd.h>
#endif
#if HAVE_SYS_TIMES_H
#include <sys/times.h>
#endif

#if HAVE_MPI_H
#include <mpi.h>

#include "prologterms2c.h"

#ifndef _yap_c_interface_h
#include <YapInterface.h>
#endif

#include "hash.h"

/*********************************************************************/
struct broadcast_req {
  void *ptr; // pointer to an allocated memory buffer associated to the broadcast
  int  nreq; // number of requests associated to the broadcast
};
typedef struct broadcast_req BroadcastRequest;

/*********************************************************************/
#define  IDTYPE long
#define  HANDLE2INT(ptr) (IDTYPE)ptr
#define  INT2HANDLE(id)  (MPI_Request*)id

#define BREQ2INT(ptr)  (IDTYPE)ptr
#define INT2BREQ(ptr)  (BroadcastRequest*)ptr

#define MPI_CALL(function) ((mpi_status=function)==MPI_SUCCESS?MPI_SUCCESS:mpi_error(mpi_status))

#ifdef USE_THREADS
#include <pthread.h>
//int  pthread_create(pthread_t  *  thread, pthread_attr_t * attr, void *
//       (*start_routine)(void *), void * arg);
//pthread_exit()
#endif

/********************************************************************
 * Auxiliary data
 ********************************************************************/
static int mpi_status;
extern char **Yap_argv;
extern int Yap_argc;

#define HASHSIZE 1777
static hashtable requests=NULL;
static hashtable broadcasts=NULL;

/********************************************************************
 * Time accounting
 ********************************************************************/
#ifdef MPISTATS

#include <sys/time.h>
#include <time.h>

/* Statistics */
static unsigned long bytes_sent;    // bytes received (mpi headers are ignored)
static unsigned long bytes_recv;    // bytes received
static unsigned long num_msgs_sent; // number of messages sent
static unsigned long num_msgs_recv; // number of messages received
static unsigned long max_s_recv_msg;// maximum size of a message received 
static unsigned long max_s_sent_msg;// maximum size of a message sent
static double total_time_spent;     // total time spend in communication code

/* MSG ACCOUNTING */
#define RESET_STATS()        {total_time_spent=0;bytes_sent=bytes_recv=num_msgs_recv=num_msgs_sent=max_s_recv_msg= max_s_sent_msg=0;}
#define MSG_SENT(size)       {bytes_sent+=size;++num_msgs_sent;if(max_s_sent_msg<size)max_s_sent_msg=size;}
#define MSG_RECV(size)       {bytes_recv+=size;++num_msgs_recv;if(max_s_recv_msg<size)max_s_recv_msg=size;}

#define MPITIME         total_time_spent

/* Timer */
#define CONT_TIMER()        {tstart();}
#define PAUSE_TIMER()       {tend();total_time_spent+=tval();}

#define RETURN(p)     {PAUSE_TIMER();return (p);}

static struct timeval _tstart, _tend;
#include <sys/time.h>
#include <sys/resource.h>
#include <unistd.h>

void init_mpi(void);

void tstart(void) {
  struct rusage r;
  getrusage(RUSAGE_SELF,&r);
  _tstart=r.ru_utime;
}
void tend(void) {
  struct rusage r;
  getrusage(RUSAGE_SELF,&r);
  _tend=r.ru_utime;
}
//
double tval(){
  double t1, t2,elapsed;   
  t1 =  (double)_tstart.tv_sec + (double)_tstart.tv_usec/(1000*1000); 
  t2 =  (double)_tend.tv_sec + (double)_tend.tv_usec/(1000*1000); 
  elapsed=t2-t1;
  if (elapsed==0) return 0.000001;
  return elapsed;
}
/*
 * returns the statistics
 */
static int  mpi_stats(void){  
  fprintf(stderr,"%f  %ld %ld %ld %ld %ld %ld\n",MPITIME,num_msgs_recv,bytes_recv,max_s_recv_msg,num_msgs_sent,bytes_sent,max_s_sent_msg);
  return (YAP_Unify(YAP_ARG1, YAP_MkFloatTerm((float)(MPITIME))) && 
	  YAP_Unify(YAP_ARG2, YAP_MkIntTerm((long)num_msgs_recv))      &&
	  YAP_Unify(YAP_ARG3, YAP_MkIntTerm((long)bytes_recv))         &&
	  YAP_Unify(YAP_ARG4, YAP_MkIntTerm((long)max_s_recv_msg))     &&
	  YAP_Unify(YAP_ARG5, YAP_MkIntTerm((long)num_msgs_sent))      &&
	  YAP_Unify(YAP_ARG6, YAP_MkIntTerm((long)bytes_sent))         &&
	  YAP_Unify(YAP_ARG7, YAP_MkIntTerm((long)max_s_sent_msg))
	  );
}
/*
 * 
 */
static int  mpi_reset_stats(void) {RESET_STATS(); return (TRUE);}
#else

#define PAUSE_TIMER()
#define CONT_TIMER()


#define RESET_STATS()        
#define MSG_SENT(size)       
#define MSG_RECV(size)       
#define RETURN(p)     {return (p);}
#endif

/********************************************************************
 * Functions to store/fetch/delete requests
 ********************************************************************/

static inline int 
new_request(MPI_Request *handle,void* ptr) {
  return insere(requests,(ulong)HANDLE2INT(handle),ptr);
}
 
static inline void* 
get_request(MPI_Request *handle) {
  return get_object(requests,(ulong)HANDLE2INT(handle));
} 

static inline void 
free_request(MPI_Request *handle) {
  void* ptr;
  ptr=delete(requests,(ulong)HANDLE2INT(handle));
  free(ptr);
  free(handle);
}
/********************************************************************
 * Functions to store/fetch/delete broadcast requests
 ********************************************************************/
/*
 * Returns a new BroadcastRequest object
 */
static inline BroadcastRequest* 
new_broadcast(void) {
  BroadcastRequest* b=(BroadcastRequest *)malloc(sizeof(BroadcastRequest));
  if ( b!=NULL) {
    b->ptr=NULL;
    b->nreq=0;
  }
  //  write_msg(__FUNCTION__,__FILE__,__LINE__,"new broadcast: %p\n",b);
  return b;
}
/*
 *
 */
static inline void 
free_broadcast_request(MPI_Request *handle) {
  BroadcastRequest* b;
  b=(BroadcastRequest*)delete(broadcasts,(ulong)BREQ2INT(handle));// get the ptr to broadcast object
  b->nreq--;
  if ( !b->nreq ) {
    // all requests received
    free(b->ptr);
    free(b);
  }
  //  write_msg(__FUNCTION__,__FILE__,__LINE__,"free broadcast_request: %p->%p\n",b,handle);
  free(handle);
}
/*
 *
 */
static inline void* 
get_broadcast_request(MPI_Request *handle) {
  return get_object(broadcasts,(ulong)HANDLE2INT(handle));
} 
/*
 *
 */
static inline int 
new_broadcast_request(BroadcastRequest* b,MPI_Request *handle,void* ptr) {
  b->ptr=ptr;
  b->nreq++;
  //write_msg(__FUNCTION__,__FILE__,__LINE__,"new broadcast_request: %p->%p\n",b,handle);
  return insere(broadcasts,(ulong)HANDLE2INT(handle),b);
}

/*********************************************************************/
static int mpi_error(int errcode){
  char err_msg[MPI_MAX_ERROR_STRING];
  int len;
  
  MPI_Error_string(errcode,&err_msg[0],&len);
  err_msg[len]='\0';
#ifdef DEBUG
  write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_Error: %s\n",err_msg);
#endif
  return errcode;
}
/********************************************************************

 ********************************************************************/
/*
 * Sets up the mpi enviromment. This function should be called before any other MPI
 * function.
 */
static int 
mpi_init(void){
  int thread_level;
  //  MPI_Init(&Yap_argc, &Yap_argv);
  MPI_Init_thread(&Yap_argc, &Yap_argv,MPI_THREAD_SINGLE,&thread_level);
#ifdef DEBUG
  write_msg(__FUNCTION__,__FILE__,__LINE__,"Thread level: %d\n",thread_level);
#endif
#ifdef MPISTATS
  RESET_STATS();
#endif
  return (TRUE);
}

#ifdef USE_THREADS
/*
 * Sets up the mpi enviromment. This function should be called before any other MPI
 * function.
 * the argument is the name of the predicate that will be invoked when a message is received
 */
static int 
rcv_msg_thread(char *handle_pred) {
  YAP_Term pred=YAP_MkAtomTerm(YAP_LookupAtom(handle_pred));
  MPI_Status status;

  while(1) { 
    write_msg(__FUNCTION__,__FILE__,__LINE__,"Waiting for MPI msg\n");
    if( MPI_CALL(MPI_Probe( MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status )) == MPI_SUCCESS ) {
      // call handle
      write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI Msg received\n");
      YAP_CallProlog(pred);
    } else
      write_msg(__FUNCTION__,__FILE__,__LINE__,"Error in MPI_Probe\n");
  }
  return 1;
}
/*
 *
 */
static int 
mpi_init_rcv_thread(void){
  int thread_level;
  //  MPI_Init(&Yap_argc, &Yap_argv);
  pthread_t  thread;
  char *arg="handle_msg";

  MPI_Init_thread(&Yap_argc, &Yap_argv,MPI_THREAD_SINGLE,&thread_level);
  if(pthread_create(&thread,NULL,(void*)&rcv_msg_thread,arg)) {
    return (FALSE);
  }

  pthread_detach(thread);
  write_msg(__FUNCTION__,__FILE__,__LINE__,"Thread level: %d\n",thread_level);
  return (TRUE);
}
#endif

/*
 *Terminates the MPI execution enviroment. Every process must call this function before 
 * exiting.
 * mpi_comm_finalize.
 */
static int 
mpi_finalize(void){
  return (MPI_Finalize()==MPI_SUCCESS?TRUE:FALSE);
}
/*
 * Returns the number of workers associated to the MPI_COMM_WORLD communicator.
 * mpi_comm_size(-Size).
 */
static int 
mpi_comm_size(void){
  int	size;
  MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
  return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(size)));
}
/*
 * Returns the rank of the current process.
 * mpi_comm_rank(-Rank).
 */
static int 
mpi_comm_rank(void){
  int  rank;
  MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
  return(YAP_Unify(YAP_ARG1,YAP_MkIntTerm(rank)));
}
/*
 * Returns the major and minor version of MPI.
 *  mpi_version(-Major,-Minor).
 */
static int 
mpi_version(void){
  int  major,minor;

  MPI_CALL(MPI_Get_version(&major,&minor));
  return (YAP_Unify(YAP_ARG1,YAP_MkIntTerm(major)) && YAP_Unify(YAP_ARG2,YAP_MkIntTerm(minor)));
}
/*
 * 
 * 
 */
static int 
mpi_get_processor_name(void) {
  char name[MPI_MAX_PROCESSOR_NAME];
  int length;
  MPI_CALL(MPI_Get_processor_name(name,&length));
  return (YAP_Unify(YAP_ARG1,YAP_MkAtomTerm(YAP_LookupAtom(name))));
}
/*
 * Non blocking communication function. The message is sent when possible. To check for the status of the message,
 * the mpi_wait and mpi_test should be used. Until mpi_wait is called, the memory allocated for the buffer containing 
 * the message is not released.
 *
 * mpi_isend(+Data, +Destination, +Tag, -Handle).
 */
static int 
mpi_isend(void) {
  YAP_Term t1 = YAP_Deref(YAP_ARG1), 
    t2 = YAP_Deref(YAP_ARG2), 
    t3 = YAP_Deref(YAP_ARG3), 
    t4 = YAP_Deref(YAP_ARG4);
  char *str=NULL;
  int dest,tag;
  size_t len=0;
  MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request));

  CONT_TIMER();
  if ( handle==NULL ) return (FALSE);

  if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3) || !YAP_IsVarTerm(t4)) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  //
  dest = YAP_IntOfTerm(t2);
  tag  = YAP_IntOfTerm(t3);
  // 
  str=term2string(NULL,&len,t1);
  MSG_SENT(len);
  // send the data 
  if( MPI_CALL(MPI_Isend( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD ,handle)) != MPI_SUCCESS ) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }

#ifdef DEBUG
  write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag);
#endif
  // We must associate the string to each handle
  new_request(handle,str);
  USED_BUFFER(); //  informs the prologterm2c module that the buffer is now used and should not be messed
  PAUSE_TIMER();
  RETURN(YAP_Unify(t4,YAP_MkIntTerm(HANDLE2INT(handle))));// it should always succeed
}
/*
 * Blocking communication function. The message is sent immediatly.
 * mpi_send(+Data, +Destination, +Tag).
 */
static int 
mpi_send(void) {

  YAP_Term t1 = YAP_Deref(YAP_ARG1), 
    t2 = YAP_Deref(YAP_ARG2), 
    t3 = YAP_Deref(YAP_ARG3);
  char *str=NULL;
  int dest,tag;
  size_t len=0;
  int val;
  if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3)) {
    return (FALSE);
  }

  CONT_TIMER();
  //
  dest = YAP_IntOfTerm(t2);
  tag  = YAP_IntOfTerm(t3);
  // the data is packaged as a string
  str=term2string(NULL,&len,t1);
#ifdef DEBUG
  write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag);
#endif
  // send the data 
  val=(MPI_CALL(MPI_Send( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD))==MPI_SUCCESS?TRUE:FALSE);
  
  PAUSE_TIMER();
  RETURN(val);
}
/*
 * Implements a blocking receive operation. 
 *  mpi_recv(?Source,?Tag,-Data).
 */
static int 
mpi_recv(void) {
  YAP_Term t1 = YAP_Deref(YAP_ARG1), 
    t2 = YAP_Deref(YAP_ARG2), 
    t3 = YAP_Deref(YAP_ARG3),
    t4;
  int tag, orig;
  int len=0;
  MPI_Status status;

  //The third argument (data) must be unbound
  if(!YAP_IsVarTerm(t3)) {
    return FALSE;
  }
  /* The first argument (Source) must be bound to an integer
     (the rank of the source) or left unbound (i.e. any source
     is OK) */
  if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE;
  else if( !YAP_IsIntTerm(t1) ) return (FALSE);
  else orig = YAP_IntOfTerm(t1);
  
  /* The second argument must be bound to an integer (the tag)
     or left unbound (i.e. any tag is OK) */
  if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG;
  else if( !YAP_IsIntTerm(t2) ) return (FALSE);
  else  tag  = YAP_IntOfTerm( t2 );

  CONT_TIMER();
  // probe for term' size
  if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  if( MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &len )) != MPI_SUCCESS || 
      status.MPI_TAG==MPI_UNDEFINED || 
      status.MPI_SOURCE==MPI_UNDEFINED) { 
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  //realloc memory buffer
  change_buffer_size(len+1);
  BUFFER_LEN=len; 
  // Already know the source from MPI_Probe()
  if( orig == MPI_ANY_SOURCE ) {
    orig = status.MPI_SOURCE;
    if( !YAP_Unify(t1, YAP_MkIntTerm(orig))) {
      PAUSE_TIMER();
      RETURN(FALSE);
    }
  }
  // Already know the tag from MPI_Probe()
  if( tag == MPI_ANY_TAG ) {
    tag = status.MPI_TAG;
    if( !YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))) {
      PAUSE_TIMER();
      RETURN(FALSE); 
    }
  }
  // Receive the message as a string
  if( MPI_CALL(MPI_Recv( BUFFER_PTR, BUFFER_LEN, MPI_CHAR,  orig, tag,
			 MPI_COMM_WORLD, &status )) != MPI_SUCCESS ) {
    /* Getting in here should never happen; it means that the first
       package (containing size) was sent properly, but there was a glitch with
       the actual content! */
    PAUSE_TIMER();
    RETURN(FALSE);
  }
#ifdef DEBUG
  write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,BUFFER_PTR, BUFFER_LEN, orig, tag);
#endif
  MSG_RECV(BUFFER_LEN);
  t4=string2term(BUFFER_PTR,&BUFFER_LEN);
  PAUSE_TIMER();
  RETURN(YAP_Unify(t3,t4));
}
/*
 * Implements a non-blocking receive operation. 
 * mpi_irecv(?Source,?Tag,-Handle).
 */
static int 
mpi_irecv(void) {
  YAP_Term t1 = YAP_Deref(YAP_ARG1), 
    t2 = YAP_Deref(YAP_ARG2), 
    t3 = YAP_Deref(YAP_ARG3);
  int tag, orig;
  int len=0;
  MPI_Status status;
  MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request));

  // The third argument (data) must be unbound
  if(!YAP_IsVarTerm(t3)) {
    //Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_receive");
    return FALSE;
  }
  /* The first argument (Source) must be bound to an integer
     (the rank of the source) or left unbound (i.e. any source
     is OK) */
  if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE;
  else if( !YAP_IsIntTerm(t1) ) return (FALSE);
  else orig = YAP_IntOfTerm(t1);
  
  /* The third argument must be bound to an integer (the tag)
     or left unbound (i.e. any tag is OK) */
  if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG;
  else if( !YAP_IsIntTerm(t2) ) return (FALSE);
  else  tag  = YAP_IntOfTerm( t2 );

  CONT_TIMER();

  // probe for the size of the term 
  if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS ){
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  //
  MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &len ));
  change_buffer_size(len+1);  
  BUFFER_LEN=len; 
  // Already know the source from MPI_Probe()
  if( orig == MPI_ANY_SOURCE ) {
    orig = status.MPI_SOURCE;
    if ( YAP_Unify(t1, YAP_MkIntTerm(orig))!=TRUE ) {
      PAUSE_TIMER();
      RETURN(FALSE);
    }
  }
  // Already know the tag from MPI_Probe()
  if( tag == MPI_ANY_TAG ) {
    tag = status.MPI_TAG;
    if (YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))!=TRUE) {
      PAUSE_TIMER();
      RETURN(FALSE);
    }
  }
  
  if(  MPI_CALL(MPI_Irecv( BUFFER_PTR, BUFFER_LEN, MPI_CHAR, orig, tag,
		  MPI_COMM_WORLD, handle )) != MPI_SUCCESS ) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  new_request(handle,BUFFER_PTR);
  DEL_BUFFER; //  force the realocation of a new memory block
  PAUSE_TIMER();
  RETURN(YAP_Unify(t3,YAP_MkIntTerm(HANDLE2INT(handle))));
}
/*
 *  Completes a non-blocking operation. IF the operation was a send, the
 * function waits until the message is buffered or sent by the runtime
 * system. At this point the send buffer is released. If the operation
 * was a receive, it waits until the message is copied to the receive
 * buffer.
 * mpi_wait(+Handle,-Status).
*/
static int 
mpi_wait(void) {
  YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
    t2 = YAP_Deref(YAP_ARG2); // Status
  MPI_Status status;
  MPI_Request *handle;

  // The first argument  must be an integer (an handle)
  if(!YAP_IsIntTerm(t1)) {
    return FALSE;
  }
  handle=INT2HANDLE(YAP_IntOfTerm(t1));
  CONT_TIMER();
  // probe for term' size
  if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS ) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  free_request(handle);
  PAUSE_TIMER();
  RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
 *  Provides information regarding a handle, ie. if a communication operation has been completed.
 * If the operation has been completed the predicate succeeds with the completion status,
 * otherwise it fails.
 * mpi_test(+Handle,-Status).
*/
static int 
mpi_test(void) {
  YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
           t2 = YAP_Deref(YAP_ARG2); // Status
  MPI_Status status;
  MPI_Request *handle;
  int flag;

  // The first argument (handle) must be an integer
  if(!YAP_IsIntTerm(t1)) {
    return FALSE;
  }
  CONT_TIMER();

  handle=INT2HANDLE(YAP_IntOfTerm(t1));
  //
  MPI_CALL(MPI_Test( handle , &flag, &status ));
  if( flag != TRUE ) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  free_request(handle);
  PAUSE_TIMER();
  RETURN(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
 *  Completes a non-blocking operation. IF the operation was a send, the
 * function waits until the message is buffered or sent by the runtime
 * system. At this point the send buffer is released. If the operation
 * was a receive, it waits until the message is copied to the receive
 * buffer.
 * mpi_wait(+Handle,-Status,-Data).
 */
static int 
mpi_wait_recv(void) {
  YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
    t2 = YAP_Deref(YAP_ARG2), // Status
    t3 = YAP_Deref(YAP_ARG3); // data
  MPI_Status status;
  MPI_Request *handle;
  char *s;
  int len,ret;

  // The first argument (handle) must be an integer
  if(!YAP_IsIntTerm(t1)) {
    return FALSE;
  }
  CONT_TIMER();

  handle=INT2HANDLE(YAP_IntOfTerm(t1));
  // wait for communication completion
  if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  s=(char*)get_request(handle);
  len=strlen(s);
  ret=YAP_Unify(t3,string2term(s,(size_t*)&len));
  MSG_RECV(len);
  free_request(handle);
  PAUSE_TIMER();
  RETURN(ret & YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
 * Provides information regarding a handle, ie. if a communication operation has been completed.
 * If the operation has been completed the predicate succeeds with the completion status,
 * otherwise it fails.
 *
 * mpi_test(+Handle,-Status,-Data).
 */
static int 
mpi_test_recv(void) {
  YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
    t2 = YAP_Deref(YAP_ARG2), // Status
    t3 = YAP_Deref(YAP_ARG3); // data

  MPI_Status status;
  MPI_Request *handle;
  int flag,len,ret;
  char *s;

  // The first argument (handle) must be an integer
  if(!YAP_IsIntTerm(t1)) {
    return FALSE;
  }
  CONT_TIMER();

  handle=INT2HANDLE(YAP_IntOfTerm(t1));
  //
  if( MPI_CALL(MPI_Test( handle , &flag, &status ))!=MPI_SUCCESS) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  s=(char*)get_request(handle);
  len=strlen(s);
  ret=YAP_Unify(t3,string2term(s,(size_t*)&len));
  free_request(handle);
  PAUSE_TIMER();
  RETURN(ret & YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
}
/*
 * Collective communication function that performs a barrier synchronization among all processes.
 * mpi_barrier
 */
static int 
mpi_barrier(void) {
  CONT_TIMER();
  int ret=MPI_CALL(MPI_Barrier(MPI_COMM_WORLD));
  PAUSE_TIMER();
  return (ret==MPI_SUCCESS?TRUE:FALSE);
}
/***********************************
 * Broadcast
 ***********************************/
/*
 * Broadcasts a message from the process with rank "root" to
 *      all other processes of the group.
 *  Note: Collective communication means all processes within a communicator call the same routine.
 *       To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2
 *
 *  mpi_bcast(+Root,+Data).
 */
static int 
mpi_bcast(void) {
  YAP_Term t1 = YAP_Deref(YAP_ARG1), 
    t2 = YAP_Deref(YAP_ARG2);
  int root,val;
  size_t len=0;
  char *str;
  //The arguments should be bound
  if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1)) {
    return FALSE;
  }

  CONT_TIMER();
  root = YAP_IntOfTerm(t1);
  str=term2string(NULL,&len,t2);
#ifdef DEBUG
  write_msg(__FUNCTION__,__FILE__,__LINE__,"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root);
#endif
  // send the data 
  val=(MPI_CALL(MPI_Bcast( str, len, MPI_CHAR, root, MPI_COMM_WORLD))==MPI_SUCCESS?TRUE:FALSE);

#ifdef MPISTATS
  {
   int	size;
   MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
   MSG_SENT(len*size);
  }
#endif
  PAUSE_TIMER();
  RETURN(val);
}
/*
 * Broadcasts a message from the process with rank "root" to
 *      all other processes of the group.
 * Note: Collective communication means all processes within a communicator call the same routine.
 *       To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2
 * mpi_bcast_int(+Root,+Data,+Tag).
 */
static int 
my_bcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
  int root;
  int k,worldsize;
  size_t len=0;
  char *str;
  int tag;

  //The arguments should be bound
  if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
    return FALSE;
  }
  
  CONT_TIMER();

  MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD,&worldsize));

  root = YAP_IntOfTerm(t1);
  tag = YAP_IntOfTerm(t3);
  str=term2string(NULL,&len,t2);
  
  for(k=0;k<=worldsize-1;++k)
    if(k!=root) {
      // Use async send?
      MSG_SENT(len);
      if(MPI_CALL(MPI_Send( str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD))!=MPI_SUCCESS) {
	PAUSE_TIMER();
	return(FALSE);
      }
#ifdef DEBUG
  write_msg(__FUNCTION__,__FILE__,__LINE__,"bcast2(%s,%u, MPI_CHAR,%d,%d)\n",str,len,k,tag);
#endif
    }
  PAUSE_TIMER();
  RETURN(TRUE);
}
/*
 * mpi_bcast(+Root,+Data).
 */
static int 
mpi_bcast2(void) {
  return my_bcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0));
}
/*
 * Broadcasts a message from the process with rank "root" to
 *      all other processes of the group.
 * Note: Collective communication means all processes within a communicator call the same routine.
 *       To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2
 *
 * mpi_bcast(+Root,+Data,+Tag).
 */
static int 
mpi_bcast3(void) {
  return my_bcast(YAP_ARG1,YAP_ARG2,YAP_ARG3);
}
/*
 * Broadcasts a message from the process with rank "root" to
 *      all other processes of the group.
 * mpi_ibcast(+Root,+Data,+Tag).
 */
static int 
my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
  int root;
  int k,worldsize;
  size_t len=0;
  char *str;
  int tag;
  BroadcastRequest *b;

  //fprintf(stderr,"ibcast1");
  //The arguments should be bound
  if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
    return FALSE;
  }

  CONT_TIMER();

  //  fprintf(stderr,"ibcast2");
  MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD,&worldsize));

  root = YAP_IntOfTerm(t1);
  tag = YAP_IntOfTerm(t3);
  str = term2string(NULL,&len,t2);
  b=new_broadcast();
  if ( b==NULL ) {
    PAUSE_TIMER();
    RETURN(FALSE);
  }
  //fprintf(stderr,"ibcast3");
  for(k=0;k<=worldsize-1;++k) {
    if(k!=root) {
      MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request));
      MSG_SENT(len);
      // Use async send
      if(MPI_CALL(MPI_Isend(str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD,handle))!=MPI_SUCCESS) {
	free(handle);
	PAUSE_TIMER();
	RETURN(FALSE);
      }
      new_broadcast_request(b,handle,str);
      //new_request(handle,str);
      USED_BUFFER();
    }
  }
  if(!b->nreq)//release b if no messages were sent (worldsize==1)
    free(b);

#ifdef DEBUG
  {
    struct mallinfo s = mallinfo();
    printf("%d: %d=%d/%d\n",getpid(),s.arena,s.uordblks,s.fordblks); //vsc
  }
#endif
  PAUSE_TIMER();
  //fprintf(stderr,"ibcast4");
  RETURN(TRUE);
}
/*
 * Broadcasts a message from the process with rank "root" to
 *      all other processes of the group.
 * To receive the message the recipients use MPI_Recv
 * The message is sent using MPI_Isend
 * mpi_ibcast(+Root,+Data,+Tag).
 */
static int 
mpi_ibcast3(void) {
  return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_ARG3);
}
/*
 * mpi_ibcast(+Root,+Data).
 */
static int 
mpi_ibcast2(void) {
  return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0));
}
/*******************************************
 * Garbage collection
 *******************************************/
/*
 * Attempts to release the requests structures used in asynchronous communications
 */
static void 
gc(hashtable ht) {
  MPI_Request *handle;
  hashnode* node;
  MPI_Status status;
  int flag;

  node=(hashnode*)next_hashnode(ht);
  if ( node==NULL) return;
  
  gc(ht); // start at the end
  
  handle=INT2HANDLE(node->value);
  MPI_CALL(MPI_Test( handle , &flag, &status ));
  if ( flag==TRUE) {
    MPI_CALL(MPI_Wait(handle,&status));
#ifdef DEBUG
    write_msg(__FUNCTION__,__FILE__,__LINE__,"Released handle...%s\n",(char*)node->obj);
#endif
    if (ht==requests)
      free_request(handle);
    else
      free_broadcast_request(handle);
  }
}
/*
 * 
 */
static int 
mpi_gc(void) {
  //write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc>: requests=%d\n",requests->n_entries);
  CONT_TIMER();
  init_hash_traversal(requests);
  gc(requests);
  init_hash_traversal(broadcasts);
  gc(broadcasts);
  //write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc<: requests=%d\n",requests->n_entries);
  PAUSE_TIMER();
  RETURN(TRUE);
}
/********************************************************************
 * Init
 *******************************************************************/
void 
init_mpi(void) {

  requests=new_hashtable(HASHSIZE);
  broadcasts=new_hashtable(HASHSIZE);
  DEL_BUFFER;
  YAP_UserCPredicate( "mpi_init",  mpi_init,0);                            // mpi_init/0
#ifdef USE_THREADS
  YAP_UserCPredicate( "mpi_init_rcv_thread", mpi_init_rcv_thread,1);       // mpi_init_rcv_thread(+HandleMsgGoal/1)
#endif
  YAP_UserCPredicate( "mpi_finalize", mpi_finalize,0);                     // mpi_finalize turn
  YAP_UserCPredicate( "mpi_comm_size", mpi_comm_size,1);                   // mpi_comm_size(-Size)
  YAP_UserCPredicate( "mpi_comm_rank",  mpi_comm_rank,1);                  // mpi_comm_rank(-Rank)
  YAP_UserCPredicate( "mpi_version", mpi_version,2);                       // mpi_version(-Major,-Minor)
  YAP_UserCPredicate( "mpi_get_processor_name", mpi_get_processor_name,1); // mpi_get_processor_name(-Name)
  YAP_UserCPredicate( "mpi_send", mpi_send,3);                             // mpi_send(+Data, +Destination, +Tag).
  YAP_UserCPredicate( "mpi_isend",mpi_isend,4);
  YAP_UserCPredicate( "mpi_recv", mpi_recv,3);                             // mpi_recv(?Source,?Tag,-Data).
  YAP_UserCPredicate( "mpi_irecv", mpi_irecv,3);                           // mpi_irecv(?Source,?Tag,-Handle).
  YAP_UserCPredicate( "mpi_wait", mpi_wait,2);                             // mpi_wait(+Handle,-Status).
  YAP_UserCPredicate( "mpi_wait_rcv", mpi_wait_recv,3);                    // mpi_wait_recv(+Handle,-Status,-Data).
  YAP_UserCPredicate( "mpi_test", mpi_test,2);                             // mpi_test(+Handle,-Status).
  YAP_UserCPredicate( "mpi_test_rcv", mpi_test_recv,3);                    // mpi_test(+Handle,-Status,-Data).
  YAP_UserCPredicate( "mpi_bcast", mpi_bcast,2);                           // mpi_bcast(Root,Term)
  YAP_UserCPredicate( "mpi_bcast2", mpi_bcast2,2);                         // mpi_bcast2(Root,Term)
  YAP_UserCPredicate( "mpi_bcast2", mpi_bcast3,3);                         // mpi_bcast2(Root,Term,Tag)

  YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast2,2);                         // mpi_ibcast(Root,Term)
  YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast3,3);                         // mpi_ibcast(Root,Term,Tag)
  YAP_UserCPredicate( "mpi_barrier", mpi_barrier,0);                       // mpi_barrier/0
  YAP_UserCPredicate( "mpi_gc", mpi_gc,0);                                 // mpi_gc/0
#ifdef MPISTATS
  YAP_UserCPredicate( "mpi_stats", mpi_stats,7);                            // mpi_stats(-Time,#MsgsRecv,BytesRecv,MaxRecev,#MsgSent,BytesSent,MaxSent)
  YAP_UserCPredicate( "mpi_reset_stats", mpi_reset_stats,0);                // cleans the timers
  RESET_STATS();
#endif
  //  YAP_UserCPredicate( "mpi_gather", mpi_gather,0); //mpi_gather(+RootRank,?SendData,?RecvData)
  // Each process (root process included) sends the contents of its send buffer to the root process. The root process receives the messages and stores them in rank order. The outcome is  as if each of the  n processes in the group (including the root process) had executed a call to MPI_Send and the root had executed n calls to MPI_Recv.  The receive buffer is ignored for all non-root processes.
  // MPI_Scatter
#ifdef DEBUG
  fprintf(stderr,"MPI  module succesfully loaded.");
  fflush(stderr);
#endif
}

#endif /* HAVE_MPI_H */