move message queues to C

This commit is contained in:
Vítor Santos Costa
2014-10-13 12:34:52 +01:00
parent 7cbcd17993
commit 3c7779ec78
15 changed files with 516 additions and 343 deletions

View File

@@ -51,6 +51,7 @@ static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; return TRUE; }
#include "threads.h"
/*
* This file includes the definition of threads in Yap. Threads
* are supposed to be compatible with the SWI-Prolog thread package.
@@ -130,13 +131,116 @@ allocate_new_tid(void)
return new_worker_id;
}
static bool
mboxCreate( Term namet, mbox_t *mboxp USES_REGS )
{
pthread_mutex_t *mutexp;
pthread_cond_t *condp;
struct idb_queue *msgsp;
memset(mboxp, 0, sizeof(mbox_t));
UNLOCK(GLOBAL_mboxq_lock);
condp = & mboxp->cond;
pthread_cond_init(condp, NULL);
mutexp = & mboxp->mutex;
pthread_mutex_init(mutexp, NULL);
msgsp = & mboxp->msgs;
mboxp->nmsgs = 0;
mboxp->nclients = 0;
Yap_init_tqueue(msgsp);
// match at the end, when everything is built.
mboxp->name = namet;
return true;
}
static bool
mboxDestroy( mbox_t *mboxp USES_REGS )
{
pthread_mutex_t *mutexp = &mboxp->mutex;
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
if (mboxp->nclients == 0) {
pthread_cond_destroy(condp);
pthread_mutex_destroy(mutexp);
Yap_destroy_tqueue(msgsp PASS_REGS);
// at this point, there is nothing left to unlock!
return true;
} else {
/* we have clients in the mailbox, try to wake them up one by one */
if (mboxp->nclients > 0)
mboxp->nclients = - mboxp->nclients;
pthread_cond_signal(condp);
pthread_mutex_unlock(mutexp);
return true;
}
}
static bool
mboxSend( mbox_t *mboxp, Term t USES_REGS )
{
pthread_mutex_t *mutexp = &mboxp->mutex;
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
if (mboxp->nclients < 0) {
// oops, dead mailbox
return false;
}
Yap_enqueue_tqueue(msgsp, t PASS_REGS);
mboxp->nmsgs++;
pthread_cond_broadcast(condp);
pthread_mutex_unlock(mutexp);
return true;
}
static bool
mboxReceive( mbox_t *mboxp, Term t USES_REGS )
{
pthread_mutex_t *mutexp = &mboxp->mutex;
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
bool rc;
if (mboxp->nclients >= 0) {
mboxp->nclients++;
} else {
return false; // don't try to read if someone else already closed down...
}
do {
rc = Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS);
if (rc) {
mboxp->nmsgs--;
if (mboxp->nclients > 0) {
mboxp->nclients--;
} else {
mboxp->nclients++;
return mboxDestroy( mboxp PASS_REGS);
}
pthread_mutex_unlock(mutexp);
} else {
pthread_cond_wait(condp, mutexp);
}
} while (!rc);
return rc;
}
static bool
mboxPeek( mbox_t *mboxp, Term t USES_REGS )
{
pthread_mutex_t *mutexp = &mboxp->mutex;
struct idb_queue *msgsp = &mboxp->msgs;
bool rc = Yap_dequeue_tqueue(msgsp, t, false, false PASS_REGS);
pthread_mutex_unlock(mutexp);
return rc;
}
static int
store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal, Term *tpdetach, Term *tpexit)
store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit)
{
CACHE_REGS
UInt pm; /* memory to be requested */
Term tmod;
Term tdetach, tgoal;
if (tsize < MinTrailSpace)
tsize = MinTrailSpace;
@@ -161,7 +265,7 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal
return FALSE;
}
REMOTE_ThreadHandle(new_worker_id).tgoal =
Yap_StoreTermInDB(Deref(*tpgoal), 7);
Yap_StoreTermInDB(Deref(tgoal), 7);
if (CurrentModule) {
REMOTE_ThreadHandle(new_worker_id).cmod =
@@ -169,7 +273,7 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal
} else {
REMOTE_ThreadHandle(new_worker_id).cmod = USER_MODULE;
}
tdetach = Deref(*tpdetach);
tdetach = Deref(tdetach);
if (IsVarTerm(tdetach)){
REMOTE_ThreadHandle(new_worker_id).tdetach =
MkAtomTerm(AtomFalse);
@@ -177,10 +281,10 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal
REMOTE_ThreadHandle(new_worker_id).tdetach =
tdetach;
}
tgoal = Yap_StripModule(Deref(*tpexit), &tmod);
texit = Yap_StripModule(Deref(texit), &tmod);
REMOTE_ThreadHandle(new_worker_id).texit_mod = tmod;
REMOTE_ThreadHandle(new_worker_id).texit =
Yap_StoreTermInDB(tgoal,7);
Yap_StoreTermInDB(texit,7);
REMOTE_ThreadHandle(new_worker_id).local_preds =
NULL;
REMOTE_ThreadHandle(new_worker_id).start_of_timesp =
@@ -189,6 +293,8 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal
NULL;
REMOTE_ScratchPad(new_worker_id).ptr =
NULL;
// reset arena info
REMOTE_GlobalArena(new_worker_id) =0;
return TRUE;
}
@@ -267,6 +373,8 @@ setup_engine(int myworker_id, int init_thread)
REMOTE_PL_local_data_p(myworker_id)->reg_cache = standard_regs;
Yap_InitExStacks(myworker_id, REMOTE_ThreadHandle(myworker_id).tsize, REMOTE_ThreadHandle(myworker_id).ssize);
REMOTE_SourceModule(myworker_id) = CurrentModule = REMOTE_ThreadHandle(myworker_id).cmod;
// create a mbox
mboxCreate( MkIntTerm(myworker_id), &REMOTE_ThreadHandle(myworker_id).mbox_handle PASS_REGS );
Yap_InitTime( myworker_id );
Yap_InitYaamRegs( myworker_id );
REFRESH_CACHE_REGS
@@ -384,7 +492,7 @@ p_thread_new_tid( USES_REGS1 )
}
static int
init_thread_engine(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tgoal, Term *tdetach, Term *texit)
init_thread_engine(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit)
{
return store_specs(new_worker_id, ssize, tsize, sysize, tgoal, tdetach, texit);
}
@@ -415,7 +523,7 @@ p_create_thread( USES_REGS1 )
return FALSE;
}
/* make sure we can proceed */
if (!init_thread_engine(new_worker_id, ssize, tsize, sysize, &ARG1, &ARG5, &ARG6))
if (!init_thread_engine(new_worker_id, ssize, tsize, sysize, ARG1, ARG5, ARG6))
return FALSE;
//REMOTE_ThreadHandle(new_worker_id).pthread_handle = 0L;
REMOTE_ThreadHandle(new_worker_id).id = new_worker_id;
@@ -547,7 +655,7 @@ Yap_thread_create_engine(YAP_thread_attr *ops)
pthread_setspecific(Yap_yaamregs_key, (const void *)&Yap_standard_regs);
MUTEX_LOCK(&(REMOTE_ThreadHandle(0).tlock));
}
if (!init_thread_engine(new_id, ops->ssize, ops->tsize, ops->sysize, &t, &t, &(ops->egoal)))
if (!init_thread_engine(new_id, ops->ssize, ops->tsize, ops->sysize, t, t, (ops->egoal)))
return -1;
//REMOTE_ThreadHandle(new_id).pthread_handle = 0L;
REMOTE_ThreadHandle(new_id).id = new_id;
@@ -848,6 +956,127 @@ p_cond_create( USES_REGS1 )
return Yap_unify(ARG1, MkIntegerTerm((Int)condp));
}
static Int
p_mbox_create( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = GLOBAL_named_mboxes;
if (IsVarTerm(namet)) {
char buf[256];
sprintf(buf, "$%p", mboxp);
namet = MkAtomTerm(Yap_FullLookupAtom(buf));
Yap_unify(ARG1, namet);
}
if (IsAtomTerm(namet)) {
LOCK(GLOBAL_mboxq_lock);
while( mboxp && mboxp->name != namet)
mboxp = mboxp->next;
if (mboxp) {
UNLOCK(GLOBAL_mboxq_lock);
return FALSE;
}
mboxp = (mbox_t *)Yap_AllocCodeSpace(sizeof(mbox_t));
if (mboxp == NULL) {
UNLOCK(GLOBAL_mboxq_lock);
return FALSE;
}
// global mbox, for now we'll just insert in list
mboxp->next = GLOBAL_named_mboxes;
GLOBAL_named_mboxes = mboxp;
}
return mboxCreate( namet, mboxp PASS_REGS );
}
static Int
p_mbox_destroy( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = GLOBAL_named_mboxes, *prevp;
if (IsVarTerm(namet) || !IsAtomTerm(namet))
return FALSE;
LOCK(GLOBAL_mboxq_lock);
prevp = NULL;
while( mboxp && mboxp->name != namet) {
prevp = mboxp;
mboxp = mboxp->next;
}
if (!mboxp) {
UNLOCK(GLOBAL_mboxq_lock);
return FALSE;
}
if (mboxp == GLOBAL_named_mboxes) {
GLOBAL_named_mboxes = mboxp->next;
} else {
prevp->next = mboxp->next;
}
UNLOCK(GLOBAL_mboxq_lock);
mboxDestroy(mboxp PASS_REGS);
Yap_FreeCodeSpace( (char *)mboxp );
return TRUE;
}
static mbox_t*
getMbox(Term t)
{
mbox_t* mboxp;
if (IsAtomTerm(t)) {
LOCK(GLOBAL_mboxq_lock);
mboxp = GLOBAL_named_mboxes;
while( mboxp && mboxp->name != t) {
mboxp = mboxp->next;
}
} else if (IsIntTerm(t)) {
int wid = IntOfTerm(t);
if (REMOTE(wid) &&
(REMOTE_ThreadHandle(wid).in_use || REMOTE_ThreadHandle(wid).zombie))
{
return &REMOTE_ThreadHandle(wid).mbox_handle;
} else {
return NULL;
}
} else {
return NULL;
}
if (mboxp) {
pthread_mutex_lock(& mboxp->mutex);
}
UNLOCK(GLOBAL_mboxq_lock);
return mboxp;
}
static Int
p_mbox_send( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
return mboxSend(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_mbox_receive( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
return mboxReceive(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_mbox_peek( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
return mboxPeek(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_cond_destroy( USES_REGS1 )
{
@@ -884,9 +1113,8 @@ p_cond_wait( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2));
pthread_cond_wait(condp, &mut->m);
return TRUE;
return TRUE;
}
static Int
@@ -1112,6 +1340,11 @@ and succeeds silently.
Yap_InitCPred("$cond_signal", 1, p_cond_signal, SafePredFlag);
Yap_InitCPred("$cond_broadcast", 1, p_cond_broadcast, SafePredFlag);
Yap_InitCPred("$cond_wait", 2, p_cond_wait, SafePredFlag);
Yap_InitCPred("$message_queue_create", 1, p_mbox_create, SafePredFlag);
Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag);
Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag);
Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag);
Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag);
Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag);
Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag);