More thread fixes, including true anonymous mqueues, worker_id for sequential,
support for arithmetic exceptions on mac, fixes for with_mutex, fixes for dangling pointers in thread termination. Uuufff...
This commit is contained in:
93
C/threads.c
93
C/threads.c
@@ -30,6 +30,7 @@ static char SccsId[] = "%W% %G%";
|
||||
#include "yapio.h"
|
||||
#include "pl-shared.h"
|
||||
#include <stdio.h>
|
||||
#include <SWI-Prolog.h>
|
||||
#if HAVE_STRING_H
|
||||
#include <string.h>
|
||||
#endif
|
||||
@@ -37,6 +38,18 @@ static char SccsId[] = "%W% %G%";
|
||||
#include "tab.macros.h"
|
||||
#endif /* TABLING */
|
||||
|
||||
|
||||
PL_blob_t PL_Message_Queue = {
|
||||
PL_BLOB_MAGIC,
|
||||
PL_BLOB_UNIQUE | PL_BLOB_NOCOPY,
|
||||
"message_queue",
|
||||
0, // release
|
||||
0, // compare
|
||||
0, // write
|
||||
0 // acquire
|
||||
};
|
||||
|
||||
|
||||
#if DEBUG_LOCKS
|
||||
|
||||
int debug_locks = TRUE;
|
||||
@@ -131,6 +144,7 @@ allocate_new_tid(void)
|
||||
return new_worker_id;
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
mboxCreate( Term namet, mbox_t *mboxp USES_REGS )
|
||||
{
|
||||
@@ -884,6 +898,7 @@ p_new_mutex( USES_REGS1 )
|
||||
return Yap_unify(ARG1, MkIntegerTerm((Int)mutp));
|
||||
}
|
||||
|
||||
|
||||
static Int
|
||||
p_destroy_mutex( USES_REGS1 )
|
||||
{
|
||||
@@ -938,6 +953,30 @@ p_unlock_mutex( USES_REGS1 )
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static Int
|
||||
p_with_with_mutex( USES_REGS1 )
|
||||
{
|
||||
if (GLOBAL_WithMutex == NULL) {
|
||||
Term t = ARG1;
|
||||
LOCK(GLOBAL_ThreadHandlesLock);
|
||||
p_new_mutex( PASS_REGS1 );
|
||||
GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
|
||||
UNLOCK(GLOBAL_ThreadHandlesLock);
|
||||
} else {
|
||||
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
|
||||
}
|
||||
return p_lock_mutex( PASS_REGS1 );
|
||||
}
|
||||
|
||||
static Int
|
||||
p_unlock_with_mutex( USES_REGS1 )
|
||||
{
|
||||
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
|
||||
return p_unlock_mutex( PASS_REGS1 );
|
||||
}
|
||||
|
||||
|
||||
|
||||
static Int
|
||||
p_mutex_info( USES_REGS1 )
|
||||
{
|
||||
@@ -961,6 +1000,10 @@ p_cond_create( USES_REGS1 )
|
||||
return Yap_unify(ARG1, MkIntegerTerm((Int)condp));
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
UInt indx;
|
||||
mbox_t mbox;
|
||||
} counted_mbox;
|
||||
|
||||
static Int
|
||||
p_mbox_create( USES_REGS1 )
|
||||
@@ -969,12 +1012,15 @@ p_mbox_create( USES_REGS1 )
|
||||
mbox_t* mboxp = GLOBAL_named_mboxes;
|
||||
|
||||
if (IsVarTerm(namet)) {
|
||||
char buf[256];
|
||||
sprintf(buf, "$%p", mboxp);
|
||||
namet = MkAtomTerm(Yap_FullLookupAtom(buf));
|
||||
Yap_unify(ARG1, namet);
|
||||
}
|
||||
if (IsAtomTerm(namet)) {
|
||||
AtomEntry *ae;
|
||||
counted_mbox c;
|
||||
int new;
|
||||
c.indx = GLOBAL_mbox_count++;
|
||||
ae = Yap_lookupBlob(&c, sizeof(c), &PL_Message_Queue, &new);
|
||||
namet = MkAtomTerm(RepAtom(ae));
|
||||
mboxp = &(((counted_mbox *)(ae->rep.blob[0].data))->mbox);
|
||||
Yap_unify(ARG1, namet);
|
||||
} else if (IsAtomTerm(namet)) {
|
||||
LOCK(GLOBAL_mboxq_lock);
|
||||
while( mboxp && mboxp->name != namet)
|
||||
mboxp = mboxp->next;
|
||||
@@ -1000,8 +1046,11 @@ p_mbox_destroy( USES_REGS1 )
|
||||
Term namet = Deref(ARG1);
|
||||
mbox_t* mboxp = GLOBAL_named_mboxes, *prevp;
|
||||
|
||||
if (IsVarTerm(namet) || !IsAtomTerm(namet))
|
||||
return FALSE;
|
||||
if (IsVarTerm(namet) )
|
||||
return FALSE;
|
||||
if (IsIntTerm(namet) ) {
|
||||
return FALSE;
|
||||
}
|
||||
LOCK(GLOBAL_mboxq_lock);
|
||||
prevp = NULL;
|
||||
while( mboxp && mboxp->name != namet) {
|
||||
@@ -1027,12 +1076,18 @@ static mbox_t*
|
||||
getMbox(Term t)
|
||||
{
|
||||
mbox_t* mboxp;
|
||||
if (IsAtomTerm(t)) {
|
||||
if (IsAtomTerm(t=Deref(t))) {
|
||||
Atom at = AtomOfTerm(t);
|
||||
if (IsBlob(at)) {
|
||||
mboxp = &(((counted_mbox *)(RepAtom(at)->rep.blob[0].data))->mbox);
|
||||
LOCK(GLOBAL_mboxq_lock);
|
||||
} else {
|
||||
LOCK(GLOBAL_mboxq_lock);
|
||||
mboxp = GLOBAL_named_mboxes;
|
||||
while( mboxp && mboxp->name != t) {
|
||||
mboxp = mboxp->next;
|
||||
}
|
||||
}
|
||||
} else if (IsIntTerm(t)) {
|
||||
int wid = IntOfTerm(t);
|
||||
if (REMOTE(wid) &&
|
||||
@@ -1059,9 +1114,22 @@ p_mbox_send( USES_REGS1 )
|
||||
Term namet = Deref(ARG1);
|
||||
mbox_t* mboxp = getMbox(namet) ;
|
||||
|
||||
if (!mboxp)
|
||||
return FALSE;
|
||||
return mboxSend(mboxp, Deref(ARG2) PASS_REGS);
|
||||
}
|
||||
|
||||
static Int
|
||||
p_mbox_size( USES_REGS1 )
|
||||
{
|
||||
Term namet = Deref(ARG1);
|
||||
mbox_t* mboxp = getMbox(namet) ;
|
||||
|
||||
if (!mboxp)
|
||||
return FALSE;
|
||||
return Yap_unify( ARG2, MkIntTerm(mboxp->nclients));
|
||||
}
|
||||
|
||||
|
||||
static Int
|
||||
p_mbox_receive( USES_REGS1 )
|
||||
@@ -1069,6 +1137,8 @@ p_mbox_receive( USES_REGS1 )
|
||||
Term namet = Deref(ARG1);
|
||||
mbox_t* mboxp = getMbox(namet) ;
|
||||
|
||||
if (!mboxp)
|
||||
return FALSE;
|
||||
return mboxReceive(mboxp, Deref(ARG2) PASS_REGS);
|
||||
}
|
||||
|
||||
@@ -1079,6 +1149,8 @@ p_mbox_peek( USES_REGS1 )
|
||||
Term namet = Deref(ARG1);
|
||||
mbox_t* mboxp = getMbox(namet) ;
|
||||
|
||||
if (!mboxp)
|
||||
return FALSE;
|
||||
return mboxPeek(mboxp, Deref(ARG2) PASS_REGS);
|
||||
}
|
||||
|
||||
@@ -1339,6 +1411,8 @@ and succeeds silently.
|
||||
Yap_InitCPred("$lock_mutex", 1, p_lock_mutex, SafePredFlag);
|
||||
Yap_InitCPred("$trylock_mutex", 1, p_trylock_mutex, SafePredFlag);
|
||||
Yap_InitCPred("$unlock_mutex", 1, p_unlock_mutex, SafePredFlag);
|
||||
Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0);
|
||||
Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0);
|
||||
Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag);
|
||||
Yap_InitCPred("$cond_create", 1, p_cond_create, SafePredFlag);
|
||||
Yap_InitCPred("$cond_destroy", 1, p_cond_destroy, SafePredFlag);
|
||||
@@ -1349,6 +1423,7 @@ and succeeds silently.
|
||||
Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag);
|
||||
Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag);
|
||||
Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag);
|
||||
Yap_InitCPred("$message_queue_size", 2, p_mbox_size, SafePredFlag);
|
||||
Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag);
|
||||
Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
|
||||
Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag);
|
||||
|
Reference in New Issue
Block a user