try to make MT more robust by:

- tracking memory overflows (still worried about blobs)
- moving core components to C-code, namely the mailboxes.
- reducing locking: too many locks == deadlock.
thanks to Paulo Moura for the test suite!
This commit is contained in:
Vítor Santos Costa 2014-10-19 01:54:57 +01:00
parent a351e1f38f
commit 53a8a8f7c9
7 changed files with 201 additions and 93 deletions

View File

@ -238,10 +238,10 @@ OpDec(int p, const char *type, Atom a, Term m)
info->KindOfPE = Ord(OpProperty);
info->OpModule = m;
info->OpName = a;
LOCK(OpListLock);
//LOCK(OpListLock);
info->OpNext = OpList;
OpList = info;
UNLOCK(OpListLock);
//UNLOCK(OpListLock);
AddPropToAtom(ae, (PropEntry *)info);
INIT_RWLOCK(info->OpRWLock);
WRITE_LOCK(info->OpRWLock);
@ -1196,6 +1196,8 @@ InitThreadHandle(int wid)
pthread_mutex_init(mutexp, NULL);
msgsp = & mboxp->msgs;
mboxp->nmsgs = 0;
mboxp->nclients = 0;
mboxp->open = true;
Yap_init_tqueue(msgsp);
}

View File

@ -50,13 +50,12 @@ PL_blob_t PL_Message_Queue = {
};
#if DEBUG_LOCKS
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
int debug_locks = TRUE;
int debug_locks = FALSE, debug_pe_locks = FALSE;
static Int p_debug_locks( USES_REGS1 ) { debugf=stdout; debug_pe_locks = 1; return TRUE; }
static Int p_debug_locks( USES_REGS1 ) { debug_locks = 1; return TRUE; }
static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; return TRUE; }
static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; debug_pe_locks = 0; return TRUE; }
#endif
@ -135,6 +134,7 @@ allocate_new_tid(void)
MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock));
REMOTE_ThreadHandle(new_worker_id).in_use = TRUE;
} else if (new_worker_id < MAX_THREADS) {
// reuse existing thread
MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock));
REMOTE_ThreadHandle(new_worker_id).in_use = TRUE;
} else {
@ -153,7 +153,6 @@ mboxCreate( Term namet, mbox_t *mboxp USES_REGS )
struct idb_queue *msgsp;
memset(mboxp, 0, sizeof(mbox_t));
UNLOCK(GLOBAL_mboxq_lock);
condp = & mboxp->cond;
pthread_cond_init(condp, NULL);
mutexp = & mboxp->mutex;
@ -164,7 +163,8 @@ mboxCreate( Term namet, mbox_t *mboxp USES_REGS )
Yap_init_tqueue(msgsp);
// match at the end, when everything is built.
mboxp->name = namet;
return true;
mboxp->open = true;
return true;
}
static bool
@ -173,8 +173,8 @@ mboxDestroy( mbox_t *mboxp USES_REGS )
pthread_mutex_t *mutexp = &mboxp->mutex;
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
if (mboxp->nclients == 0) {
mboxp->open = false;
if (mboxp->nclients == 0 ) {
pthread_cond_destroy(condp);
pthread_mutex_destroy(mutexp);
Yap_destroy_tqueue(msgsp PASS_REGS);
@ -182,9 +182,7 @@ mboxDestroy( mbox_t *mboxp USES_REGS )
return true;
} else {
/* we have clients in the mailbox, try to wake them up one by one */
if (mboxp->nclients > 0)
mboxp->nclients = - mboxp->nclients;
pthread_cond_signal(condp);
pthread_cond_broadcast(condp);
pthread_mutex_unlock(mutexp);
return true;
}
@ -197,11 +195,12 @@ mboxSend( mbox_t *mboxp, Term t USES_REGS )
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
if (mboxp->nclients < 0) {
if (!mboxp->open) {
// oops, dead mailbox
return false;
}
Yap_enqueue_tqueue(msgsp, t PASS_REGS);
// printf("+ (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs);
mboxp->nmsgs++;
pthread_cond_broadcast(condp);
pthread_mutex_unlock(mutexp);
@ -214,24 +213,34 @@ mboxReceive( mbox_t *mboxp, Term t USES_REGS )
pthread_mutex_t *mutexp = &mboxp->mutex;
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
bool rc;
bool rc;
if (mboxp->nclients >= 0) {
mboxp->nclients++;
} else {
return false; // don't try to read if someone else already closed down...
if (!mboxp->open){
return false; // don't try to read if someone else already closed down...
}
mboxp->nclients++;
do {
rc = Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS);
rc = mboxp->nmsgs && Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS);
if (rc) {
mboxp->nmsgs--;
if (mboxp->nclients > 0) {
mboxp->nclients--;
} else {
mboxp->nclients++;
return mboxDestroy( mboxp PASS_REGS);
}
mboxp->nclients--;
mboxp->nmsgs--;
//printf("- (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs);
// Yap_do_low_level_trace=1;
pthread_mutex_unlock(mutexp);
return true;
} else if (!mboxp->open) {
//printf("o (%d)\n", worker_id);
mboxp->nclients--;
if (!mboxp->nclients) {// release
pthread_cond_destroy(condp);
pthread_mutex_destroy(mutexp);
Yap_destroy_tqueue(msgsp PASS_REGS);
// at this point, there is nothing left to unlock!
} else {
pthread_cond_broadcast(condp);
pthread_mutex_unlock(mutexp);
}
return false;
} else {
pthread_cond_wait(condp, mutexp);
}
@ -361,6 +370,7 @@ kill_thread_engine (int wid, int always_die)
REMOTE_ThreadHandle(wid).default_yaam_regs = NULL;
LOCK(GLOBAL_ThreadHandlesLock);
GLOBAL_NOfThreads--;
UNLOCK(GLOBAL_ThreadHandlesLock);
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
if (REMOTE_ThreadHandle(wid).tdetach == MkAtomTerm(AtomTrue) ||
always_die) {
@ -368,7 +378,6 @@ kill_thread_engine (int wid, int always_die)
REMOTE_ThreadHandle(wid).in_use = FALSE;
}
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
UNLOCK(GLOBAL_ThreadHandlesLock);
}
static void
@ -741,24 +750,26 @@ static Int
p_thread_join( USES_REGS1 )
{
Int tid = IntegerOfTerm(Deref(ARG1));
pthread_t thread;
LOCK(GLOBAL_ThreadHandlesLock);
if (!REMOTE_ThreadHandle(tid).in_use &&
!REMOTE_ThreadHandle(tid).zombie) {
UNLOCK(GLOBAL_ThreadHandlesLock);
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
if (!(REMOTE_ThreadHandle(tid).in_use ||
REMOTE_ThreadHandle(tid).zombie)) {
// he's dead, jim
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return FALSE;
}
if (!REMOTE_ThreadHandle(tid).tdetach == MkAtomTerm(AtomTrue)) {
UNLOCK(GLOBAL_ThreadHandlesLock);
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return FALSE;
}
UNLOCK(GLOBAL_ThreadHandlesLock);
thread = REMOTE_ThreadHandle(tid).pthread_handle;
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
/* make sure this lock is accessible */
if (pthread_join(REMOTE_ThreadHandle(tid).pthread_handle, NULL) < 0) {
if (pthread_join(thread, NULL) < 0) {
/* ERROR */
return FALSE;
}
/* notice mutex is already locked */
return TRUE;
}
@ -767,12 +778,10 @@ p_thread_destroy( USES_REGS1 )
{
Int tid = IntegerOfTerm(Deref(ARG1));
LOCK(GLOBAL_ThreadHandlesLock);
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
REMOTE_ThreadHandle(tid).zombie = FALSE;
REMOTE_ThreadHandle(tid).in_use = FALSE;
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
UNLOCK(GLOBAL_ThreadHandlesLock);
return TRUE;
}
@ -955,15 +964,88 @@ p_unlock_mutex( USES_REGS1 )
return TRUE;
}
static Int
p_with_mutex( USES_REGS1 )
{
SWIMutex *mut;
Term t1 = Deref(ARG1), excep;
Int rc = FALSE;
Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
PredEntry *pe;
Term tm = CurrentModule;
Term tg = Deref(ARG2);
if (IsVarTerm(t1)) {
p_new_mutex( PASS_REGS1 );
t1 = Deref(ARG1);
}
mut = (SWIMutex*)IntegerOfTerm(t1);
if (!p_lock_mutex( PASS_REGS1 )) {
return FALSE;
}
tg = Yap_StripModule(tg, &tm);
if (IsVarTerm(tg)) {
Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2");
goto end;
} else if (IsApplTerm(tg)) {
register Functor f = FunctorOfTerm(tg);
register CELL *pt;
size_t i, arity;
f = FunctorOfTerm(tg);
if (IsExtensionFunctor(f)) {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
arity = ArityOfFunctor(f);
if (arity > MaxTemps) {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
pe = RepPredProp(PredPropByFunc(f, tm));
pt = RepAppl(tg)+1;
for (i= 0; i < arity; i++ )
XREGS[i+1] = pt[i];
} else if (IsAtomTerm(tg)) {
pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm));
} else if (IsPairTerm(tg)) {
register CELL *pt;
Functor f;
f = FunctorDot;
pe = RepPredProp(PredPropByFunc(f, tm));
pt = RepPair(tg);
XREGS[1] = pt[0];
XREGS[2] = pt[1];
} else {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
if (
pe->OpcodeOfPred != FAIL_OPCODE &&
Yap_execute_pred(pe, NULL PASS_REGS) ) {
rc = TRUE;
}
end:
ARG1 = MkIntegerTerm((Int)mut);
excep = Yap_GetException();
p_unlock_mutex( PASS_REGS1 );
if (creeping) {
Yap_signal( YAP_CREEP_SIGNAL );
} else if ( excep != 0) {
return Yap_JumpToEnv(excep);
}
return rc;
}
static Int
p_with_with_mutex( USES_REGS1 )
{
if (GLOBAL_WithMutex == NULL) {
Term t = ARG1;
LOCK(GLOBAL_ThreadHandlesLock);
p_new_mutex( PASS_REGS1 );
GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
UNLOCK(GLOBAL_ThreadHandlesLock);
} else {
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
}
@ -1015,13 +1097,14 @@ p_mbox_create( USES_REGS1 )
if (IsVarTerm(namet)) {
AtomEntry *ae;
counted_mbox c;
int new;
c.indx = GLOBAL_mbox_count++;
ae = Yap_lookupBlob(&c, sizeof(c), &PL_Message_Queue, &new);
mbox_t mbox;
ae = Yap_lookupBlob(&mbox, sizeof(mbox), &PL_Message_Queue, &new);
namet = MkAtomTerm(RepAtom(ae));
mboxp = &(((counted_mbox *)(ae->rep.blob[0].data))->mbox);
mboxp = (mbox_t *)(ae->rep.blob[0].data);
Yap_unify(ARG1, namet);
LOCK(GLOBAL_mboxq_lock);
} else if (IsAtomTerm(namet)) {
LOCK(GLOBAL_mboxq_lock);
while( mboxp && mboxp->name != namet)
@ -1039,7 +1122,9 @@ p_mbox_create( USES_REGS1 )
mboxp->next = GLOBAL_named_mboxes;
GLOBAL_named_mboxes = mboxp;
}
return mboxCreate( namet, mboxp PASS_REGS );
bool rc = mboxCreate( namet, mboxp PASS_REGS );
UNLOCK(GLOBAL_mboxq_lock);
return rc;
}
static Int
@ -1078,18 +1163,24 @@ static mbox_t*
getMbox(Term t)
{
mbox_t* mboxp;
if (IsAtomTerm(t=Deref(t))) {
Atom at = AtomOfTerm(t);
LOCK(GLOBAL_mboxq_lock);
if (IsBlob(at)) {
mboxp = &(((counted_mbox *)(RepAtom(at)->rep.blob[0].data))->mbox);
LOCK(GLOBAL_mboxq_lock);
mboxp = (mbox_t *)(RepAtom(at)->rep.blob[0].data);
} else {
LOCK(GLOBAL_mboxq_lock);
mboxp = GLOBAL_named_mboxes;
while( mboxp && mboxp->name != t) {
mboxp = mboxp->next;
}
}
if (!mboxp->open)
mboxp = NULL;
if (mboxp) {
pthread_mutex_lock(& mboxp->mutex);
}
UNLOCK(GLOBAL_mboxq_lock);
} else if (IsIntTerm(t)) {
int wid = IntOfTerm(t);
if (REMOTE(wid) &&
@ -1099,13 +1190,14 @@ getMbox(Term t)
} else {
return NULL;
}
if (!mboxp->open)
mboxp = NULL;
if (mboxp) {
pthread_mutex_lock(& mboxp->mutex);
}
} else {
return NULL;
}
if (mboxp) {
pthread_mutex_lock(& mboxp->mutex);
}
UNLOCK(GLOBAL_mboxq_lock);
return mboxp;
}
@ -1129,7 +1221,7 @@ p_mbox_size( USES_REGS1 )
if (!mboxp)
return FALSE;
return Yap_unify( ARG2, MkIntTerm(mboxp->nclients));
return Yap_unify( ARG2, MkIntTerm(mboxp->nmsgs));
}
@ -1202,16 +1294,16 @@ p_thread_stacks( USES_REGS1 )
Int tid = IntegerOfTerm(Deref(ARG1));
Int status= TRUE;
LOCK(GLOBAL_ThreadHandlesLock);
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
if (REMOTE(tid) &&
(REMOTE_ThreadHandle(tid).in_use || REMOTE_ThreadHandle(tid).zombie)) {
status &= Yap_unify(ARG2,MkIntegerTerm(REMOTE_ThreadHandle(tid).ssize));
status &= Yap_unify(ARG3,MkIntegerTerm(REMOTE_ThreadHandle(tid).tsize));
status &= Yap_unify(ARG4,MkIntegerTerm(REMOTE_ThreadHandle(tid).sysize));
UNLOCK(GLOBAL_ThreadHandlesLock);
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return status;
}
UNLOCK(GLOBAL_ThreadHandlesLock);
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return FALSE;
}
@ -1262,6 +1354,7 @@ p_thread_signal( USES_REGS1 )
return TRUE;
}
Yap_external_signal( wid, YAP_ITI_SIGNAL );
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
return TRUE;
}
@ -1413,6 +1506,7 @@ and succeeds silently.
Yap_InitCPred("$lock_mutex", 1, p_lock_mutex, SafePredFlag);
Yap_InitCPred("$trylock_mutex", 1, p_trylock_mutex, SafePredFlag);
Yap_InitCPred("$unlock_mutex", 1, p_unlock_mutex, SafePredFlag);
Yap_InitCPred("$with_mutex", 2, p_with_mutex, MetaPredFlag);
Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0);
Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0);
Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag);
@ -1436,7 +1530,7 @@ and succeeds silently.
Yap_InitCPred("$thread_self_lock", 1, p_thread_self_lock, SafePredFlag);
Yap_InitCPred("$thread_run_at_exit", 2, p_thread_atexit, SafePredFlag);
Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag);
#if DEBUG_LOCKS
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag);
Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag);
#endif
@ -1525,7 +1619,7 @@ void Yap_InitThreadPreds(void)
Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag);
Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag);
#if DEBUG_LOCKS
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag);
Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag);
#endif

View File

@ -316,7 +316,7 @@ wrputf(Float f, struct write_globs *wglb) /* writes a float */
localeconv()->decimal_point;
size_t l1 = strlen((const char *)decimalpoint+1);
#else
const unsigned char *decimalpoint = ".";
const unsigned char decimalpoint[2] = ".";
size_t l1 = 0;
#endif

View File

@ -682,6 +682,7 @@ typedef struct thread_mbox {
pthread_cond_t cond;
struct idb_queue msgs;
int nmsgs, nclients; // if nclients < 0 mailbox has been closed.
bool open;
struct thread_mbox *next;
} mbox_t;

View File

@ -1790,8 +1790,8 @@ PredPropByAtomAndMod (Atom at, Term cur_mod)
#define UNLOCKPE(I,Z) \
( (Z)->StatisticsForPred.NOfRetries=(I), UNLOCK((Z)->PELock) )
#else
#define PELOCK(I,Z) LOCK((Z)->PELock)
#define UNLOCKPE(I,Z) UNLOCK((Z)->PELock)
#define PELOCK(I,Z) (LOCK((Z)->PELock))
#define UNLOCKPE(I,Z) (UNLOCK((Z)->PELock))
#endif
INLINE_ONLY EXTERN inline void AddPropToAtom(AtomEntry *, PropEntry *p);

View File

@ -96,8 +96,8 @@ control when threads can be created.
We assume id == L_THREAD is optimized away if id is known at
compile-time
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
#define IF_MT(id, g) if ( id == L_THREAD ) g
#define IF_MT(id,g) g
//#define IF_MT(id, g) if ( id == L_THREAD ) g
#ifdef O_CONTENTION_STATISTICS
#define countingMutexLock(cm) \

View File

@ -957,6 +957,7 @@ mutex_destroy(Mutex) :-
erase(R),
fail.
'$erase_mutex_info'(_).
/** @pred mutex_lock(+ _MutexId_)
@ -990,7 +991,7 @@ mutex_lock(A) :-
mutex_create(A),
mutex_lock(A).
mutex_lock(V) :-
'$do_error'(type_error(atom,V),mutex_lock(V)).
'$do_error'(type_error(mutex,V),mutex_lock(V)).
/** @pred mutex_trylock(+ _MutexId_)
@ -1011,7 +1012,7 @@ mutex_trylock(A) :-
mutex_create(A),
mutex_trylock(A).
mutex_trylock(V) :-
'$do_error'(type_error(atom,V),mutex_trylock(V)).
'$do_error'(type_error(mutex,V),mutex_trylock(V)).
/** @pred mutex_unlock(+ _MutexId_)
@ -1073,37 +1074,39 @@ once/1.
*/
with_mutex(M, G) :-
( recorded('$mutex_alias',[Id|M],_) ->
'$with_mutex'(Id, G )
;
'$atom'(M ) ->
mutex_create(Id, [alias(M)]),
'$with_mutex'(M, G )
;
integer(M) ->
'$with_mutex'(M, G )
;
'$do_error'(type_error(mutex,M), with_mutex(M, G))
), nonvar(G). % preserve env.
/*
with_mutex(M, G) :-
( '$no_threads' ->
once(G)
;
var(M) ->
'$do_error'(instantiation_error,with_mutex(M, G))
;
var(G) ->
'$do_error'(instantiation_error,with_mutex(M, G))
mutex_lock(M),
var(G) -> mutex_unlock(M), '$do_error'(instantiation_error,with_mutex(M, G))
;
\+ callable(G) ->
'$do_error'(type_error(callable,G),with_mutex(M, G))
mutex_unlock(M), '$do_error'(type_error(callable,G),with_mutex(M, G))
;
atom(M) ->
'$with_with_mutex'(WMId),
( recorded('$mutex_alias',[Id|M],_) ->
true
; '$new_mutex'(Id),
recorda('$mutex_alias',[Id|M],_)
),
'$lock_mutex'(Id),
'$unlock_with_mutex'(WMId),
( catch('$execute'(G), E, ('$unlock_mutex'(Id), throw(E))) ->
'$unlock_mutex'(Id)
; '$unlock_mutex'(Id),
catch('$execute'(G), E, (mutex_unlock(M), throw(E))) ->
mutex_unlock(M)
; mutex_unlock(M),
fail
)
;
'$do_error'(type_error(atom,M),with_mutex(M, G))
).
*/
/** @pred current_mutex(? _MutexId_, ? _ThreadId_, ? _Count_)
@ -1116,8 +1119,8 @@ Enumerates all existing mutexes. If the mutex is held by some thread,
*/
current_mutex(M, T, NRefs) :-
recorded('$mutex_alias',[Id|M],_),
'$mutex_info'(Id, NRefs, T).
recorded('$mutex_alias',[Id|M],_),
'$mutex_info'(Id, NRefs, T).
mutex_property(Mutex, Prop) :-
( nonvar(Mutex) ->
@ -1220,9 +1223,17 @@ anonymous message queues, may try to wait for later.
message_queue_destroy(Name) :-
var(Name), !,
'$do_error'(instantiation_error,message_queue_destroy(Name)).
message_queue_destroy(Alias) :-
recorded('$thread_alias', [Id|Alias], Ref),
atom(Id), !,
'$message_queue_destroy'(Id),
erase(Ref).
message_queue_destroy(Name) :-
'$message_queue_destroy'(Name),
fail.
atom(Name),
'$message_queue_destroy'(Name),
recorded('$thread_alias', [Name|_Alias], Ref),
erase(Ref),
fail.
message_queue_destroy(_).
/* @pred message_queue_property(+ _Queue_)