diff --git a/C/init.c b/C/init.c index 0676716e3..7f1d76be7 100755 --- a/C/init.c +++ b/C/init.c @@ -238,10 +238,10 @@ OpDec(int p, const char *type, Atom a, Term m) info->KindOfPE = Ord(OpProperty); info->OpModule = m; info->OpName = a; - LOCK(OpListLock); + //LOCK(OpListLock); info->OpNext = OpList; OpList = info; - UNLOCK(OpListLock); + //UNLOCK(OpListLock); AddPropToAtom(ae, (PropEntry *)info); INIT_RWLOCK(info->OpRWLock); WRITE_LOCK(info->OpRWLock); @@ -1196,6 +1196,8 @@ InitThreadHandle(int wid) pthread_mutex_init(mutexp, NULL); msgsp = & mboxp->msgs; mboxp->nmsgs = 0; + mboxp->nclients = 0; + mboxp->open = true; Yap_init_tqueue(msgsp); } diff --git a/C/threads.c b/C/threads.c index c1f713e9c..192efa0f5 100644 --- a/C/threads.c +++ b/C/threads.c @@ -50,13 +50,12 @@ PL_blob_t PL_Message_Queue = { }; -#if DEBUG_LOCKS +#if DEBUG_LOCKS||DEBUG_PE_LOCKS -int debug_locks = TRUE; +int debug_locks = FALSE, debug_pe_locks = FALSE; +static Int p_debug_locks( USES_REGS1 ) { debugf=stdout; debug_pe_locks = 1; return TRUE; } -static Int p_debug_locks( USES_REGS1 ) { debug_locks = 1; return TRUE; } - -static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; return TRUE; } +static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; debug_pe_locks = 0; return TRUE; } #endif @@ -135,6 +134,7 @@ allocate_new_tid(void) MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock)); REMOTE_ThreadHandle(new_worker_id).in_use = TRUE; } else if (new_worker_id < MAX_THREADS) { + // reuse existing thread MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock)); REMOTE_ThreadHandle(new_worker_id).in_use = TRUE; } else { @@ -153,7 +153,6 @@ mboxCreate( Term namet, mbox_t *mboxp USES_REGS ) struct idb_queue *msgsp; memset(mboxp, 0, sizeof(mbox_t)); - UNLOCK(GLOBAL_mboxq_lock); condp = & mboxp->cond; pthread_cond_init(condp, NULL); mutexp = & mboxp->mutex; @@ -164,7 +163,8 @@ mboxCreate( Term namet, mbox_t *mboxp USES_REGS ) Yap_init_tqueue(msgsp); // match at the end, when everything is built. mboxp->name = namet; -return true; + mboxp->open = true; + return true; } static bool @@ -173,8 +173,8 @@ mboxDestroy( mbox_t *mboxp USES_REGS ) pthread_mutex_t *mutexp = &mboxp->mutex; pthread_cond_t *condp = &mboxp->cond; struct idb_queue *msgsp = &mboxp->msgs; - - if (mboxp->nclients == 0) { + mboxp->open = false; + if (mboxp->nclients == 0 ) { pthread_cond_destroy(condp); pthread_mutex_destroy(mutexp); Yap_destroy_tqueue(msgsp PASS_REGS); @@ -182,9 +182,7 @@ mboxDestroy( mbox_t *mboxp USES_REGS ) return true; } else { /* we have clients in the mailbox, try to wake them up one by one */ - if (mboxp->nclients > 0) - mboxp->nclients = - mboxp->nclients; - pthread_cond_signal(condp); + pthread_cond_broadcast(condp); pthread_mutex_unlock(mutexp); return true; } @@ -197,11 +195,12 @@ mboxSend( mbox_t *mboxp, Term t USES_REGS ) pthread_cond_t *condp = &mboxp->cond; struct idb_queue *msgsp = &mboxp->msgs; - if (mboxp->nclients < 0) { + if (!mboxp->open) { // oops, dead mailbox return false; } Yap_enqueue_tqueue(msgsp, t PASS_REGS); + // printf("+ (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs); mboxp->nmsgs++; pthread_cond_broadcast(condp); pthread_mutex_unlock(mutexp); @@ -214,24 +213,34 @@ mboxReceive( mbox_t *mboxp, Term t USES_REGS ) pthread_mutex_t *mutexp = &mboxp->mutex; pthread_cond_t *condp = &mboxp->cond; struct idb_queue *msgsp = &mboxp->msgs; - bool rc; + bool rc; - if (mboxp->nclients >= 0) { - mboxp->nclients++; - } else { - return false; // don't try to read if someone else already closed down... + if (!mboxp->open){ + return false; // don't try to read if someone else already closed down... } + mboxp->nclients++; do { - rc = Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS); + rc = mboxp->nmsgs && Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS); if (rc) { - mboxp->nmsgs--; - if (mboxp->nclients > 0) { - mboxp->nclients--; - } else { - mboxp->nclients++; - return mboxDestroy( mboxp PASS_REGS); - } + mboxp->nclients--; + mboxp->nmsgs--; + //printf("- (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs); + // Yap_do_low_level_trace=1; + pthread_mutex_unlock(mutexp); + return true; + } else if (!mboxp->open) { + //printf("o (%d)\n", worker_id); + mboxp->nclients--; + if (!mboxp->nclients) {// release + pthread_cond_destroy(condp); + pthread_mutex_destroy(mutexp); + Yap_destroy_tqueue(msgsp PASS_REGS); + // at this point, there is nothing left to unlock! + } else { + pthread_cond_broadcast(condp); pthread_mutex_unlock(mutexp); + } + return false; } else { pthread_cond_wait(condp, mutexp); } @@ -361,6 +370,7 @@ kill_thread_engine (int wid, int always_die) REMOTE_ThreadHandle(wid).default_yaam_regs = NULL; LOCK(GLOBAL_ThreadHandlesLock); GLOBAL_NOfThreads--; + UNLOCK(GLOBAL_ThreadHandlesLock); MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock)); if (REMOTE_ThreadHandle(wid).tdetach == MkAtomTerm(AtomTrue) || always_die) { @@ -368,7 +378,6 @@ kill_thread_engine (int wid, int always_die) REMOTE_ThreadHandle(wid).in_use = FALSE; } MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock)); - UNLOCK(GLOBAL_ThreadHandlesLock); } static void @@ -741,24 +750,26 @@ static Int p_thread_join( USES_REGS1 ) { Int tid = IntegerOfTerm(Deref(ARG1)); + pthread_t thread; - LOCK(GLOBAL_ThreadHandlesLock); - if (!REMOTE_ThreadHandle(tid).in_use && - !REMOTE_ThreadHandle(tid).zombie) { - UNLOCK(GLOBAL_ThreadHandlesLock); + MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock)); + if (!(REMOTE_ThreadHandle(tid).in_use || + REMOTE_ThreadHandle(tid).zombie)) { + // he's dead, jim + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); return FALSE; } if (!REMOTE_ThreadHandle(tid).tdetach == MkAtomTerm(AtomTrue)) { - UNLOCK(GLOBAL_ThreadHandlesLock); + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); return FALSE; } - UNLOCK(GLOBAL_ThreadHandlesLock); + thread = REMOTE_ThreadHandle(tid).pthread_handle; + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); /* make sure this lock is accessible */ - if (pthread_join(REMOTE_ThreadHandle(tid).pthread_handle, NULL) < 0) { + if (pthread_join(thread, NULL) < 0) { /* ERROR */ return FALSE; } - /* notice mutex is already locked */ return TRUE; } @@ -767,12 +778,10 @@ p_thread_destroy( USES_REGS1 ) { Int tid = IntegerOfTerm(Deref(ARG1)); - LOCK(GLOBAL_ThreadHandlesLock); MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock)); REMOTE_ThreadHandle(tid).zombie = FALSE; REMOTE_ThreadHandle(tid).in_use = FALSE; MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); - UNLOCK(GLOBAL_ThreadHandlesLock); return TRUE; } @@ -955,15 +964,88 @@ p_unlock_mutex( USES_REGS1 ) return TRUE; } +static Int +p_with_mutex( USES_REGS1 ) +{ + SWIMutex *mut; + Term t1 = Deref(ARG1), excep; + Int rc = FALSE; + Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL); + PredEntry *pe; + Term tm = CurrentModule; + Term tg = Deref(ARG2); + + if (IsVarTerm(t1)) { + p_new_mutex( PASS_REGS1 ); + t1 = Deref(ARG1); + } + mut = (SWIMutex*)IntegerOfTerm(t1); + if (!p_lock_mutex( PASS_REGS1 )) { + return FALSE; + } + + tg = Yap_StripModule(tg, &tm); + if (IsVarTerm(tg)) { + Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2"); + goto end; + } else if (IsApplTerm(tg)) { + register Functor f = FunctorOfTerm(tg); + register CELL *pt; + size_t i, arity; + + f = FunctorOfTerm(tg); + if (IsExtensionFunctor(f)) { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + arity = ArityOfFunctor(f); + if (arity > MaxTemps) { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + pe = RepPredProp(PredPropByFunc(f, tm)); + pt = RepAppl(tg)+1; + for (i= 0; i < arity; i++ ) + XREGS[i+1] = pt[i]; + } else if (IsAtomTerm(tg)) { + pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm)); + } else if (IsPairTerm(tg)) { + register CELL *pt; + Functor f; + + f = FunctorDot; + pe = RepPredProp(PredPropByFunc(f, tm)); + pt = RepPair(tg); + XREGS[1] = pt[0]; + XREGS[2] = pt[1]; + } else { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + if ( + pe->OpcodeOfPred != FAIL_OPCODE && + Yap_execute_pred(pe, NULL PASS_REGS) ) { + rc = TRUE; + } + end: + ARG1 = MkIntegerTerm((Int)mut); + excep = Yap_GetException(); + p_unlock_mutex( PASS_REGS1 ); + if (creeping) { + Yap_signal( YAP_CREEP_SIGNAL ); + } else if ( excep != 0) { + return Yap_JumpToEnv(excep); + } + return rc; +} + + static Int p_with_with_mutex( USES_REGS1 ) { if (GLOBAL_WithMutex == NULL) { - Term t = ARG1; - LOCK(GLOBAL_ThreadHandlesLock); p_new_mutex( PASS_REGS1 ); GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); - UNLOCK(GLOBAL_ThreadHandlesLock); } else { ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex); } @@ -1015,13 +1097,14 @@ p_mbox_create( USES_REGS1 ) if (IsVarTerm(namet)) { AtomEntry *ae; - counted_mbox c; int new; - c.indx = GLOBAL_mbox_count++; - ae = Yap_lookupBlob(&c, sizeof(c), &PL_Message_Queue, &new); + mbox_t mbox; + + ae = Yap_lookupBlob(&mbox, sizeof(mbox), &PL_Message_Queue, &new); namet = MkAtomTerm(RepAtom(ae)); - mboxp = &(((counted_mbox *)(ae->rep.blob[0].data))->mbox); + mboxp = (mbox_t *)(ae->rep.blob[0].data); Yap_unify(ARG1, namet); + LOCK(GLOBAL_mboxq_lock); } else if (IsAtomTerm(namet)) { LOCK(GLOBAL_mboxq_lock); while( mboxp && mboxp->name != namet) @@ -1039,7 +1122,9 @@ p_mbox_create( USES_REGS1 ) mboxp->next = GLOBAL_named_mboxes; GLOBAL_named_mboxes = mboxp; } - return mboxCreate( namet, mboxp PASS_REGS ); + bool rc = mboxCreate( namet, mboxp PASS_REGS ); + UNLOCK(GLOBAL_mboxq_lock); + return rc; } static Int @@ -1078,18 +1163,24 @@ static mbox_t* getMbox(Term t) { mbox_t* mboxp; + if (IsAtomTerm(t=Deref(t))) { Atom at = AtomOfTerm(t); + LOCK(GLOBAL_mboxq_lock); if (IsBlob(at)) { - mboxp = &(((counted_mbox *)(RepAtom(at)->rep.blob[0].data))->mbox); - LOCK(GLOBAL_mboxq_lock); + mboxp = (mbox_t *)(RepAtom(at)->rep.blob[0].data); } else { - LOCK(GLOBAL_mboxq_lock); mboxp = GLOBAL_named_mboxes; while( mboxp && mboxp->name != t) { mboxp = mboxp->next; } } + if (!mboxp->open) + mboxp = NULL; + if (mboxp) { + pthread_mutex_lock(& mboxp->mutex); + } + UNLOCK(GLOBAL_mboxq_lock); } else if (IsIntTerm(t)) { int wid = IntOfTerm(t); if (REMOTE(wid) && @@ -1099,13 +1190,14 @@ getMbox(Term t) } else { return NULL; } + if (!mboxp->open) + mboxp = NULL; + if (mboxp) { + pthread_mutex_lock(& mboxp->mutex); + } } else { return NULL; } - if (mboxp) { - pthread_mutex_lock(& mboxp->mutex); - } - UNLOCK(GLOBAL_mboxq_lock); return mboxp; } @@ -1129,7 +1221,7 @@ p_mbox_size( USES_REGS1 ) if (!mboxp) return FALSE; - return Yap_unify( ARG2, MkIntTerm(mboxp->nclients)); + return Yap_unify( ARG2, MkIntTerm(mboxp->nmsgs)); } @@ -1202,16 +1294,16 @@ p_thread_stacks( USES_REGS1 ) Int tid = IntegerOfTerm(Deref(ARG1)); Int status= TRUE; - LOCK(GLOBAL_ThreadHandlesLock); + MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock)); if (REMOTE(tid) && (REMOTE_ThreadHandle(tid).in_use || REMOTE_ThreadHandle(tid).zombie)) { status &= Yap_unify(ARG2,MkIntegerTerm(REMOTE_ThreadHandle(tid).ssize)); status &= Yap_unify(ARG3,MkIntegerTerm(REMOTE_ThreadHandle(tid).tsize)); status &= Yap_unify(ARG4,MkIntegerTerm(REMOTE_ThreadHandle(tid).sysize)); - UNLOCK(GLOBAL_ThreadHandlesLock); + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); return status; } - UNLOCK(GLOBAL_ThreadHandlesLock); + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); return FALSE; } @@ -1262,6 +1354,7 @@ p_thread_signal( USES_REGS1 ) return TRUE; } Yap_external_signal( wid, YAP_ITI_SIGNAL ); + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock)); return TRUE; } @@ -1413,6 +1506,7 @@ and succeeds silently. Yap_InitCPred("$lock_mutex", 1, p_lock_mutex, SafePredFlag); Yap_InitCPred("$trylock_mutex", 1, p_trylock_mutex, SafePredFlag); Yap_InitCPred("$unlock_mutex", 1, p_unlock_mutex, SafePredFlag); + Yap_InitCPred("$with_mutex", 2, p_with_mutex, MetaPredFlag); Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0); Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0); Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag); @@ -1436,7 +1530,7 @@ and succeeds silently. Yap_InitCPred("$thread_self_lock", 1, p_thread_self_lock, SafePredFlag); Yap_InitCPred("$thread_run_at_exit", 2, p_thread_atexit, SafePredFlag); Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag); -#if DEBUG_LOCKS +#if DEBUG_LOCKS||DEBUG_PE_LOCKS Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag); Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag); #endif @@ -1525,7 +1619,7 @@ void Yap_InitThreadPreds(void) Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag); Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag); Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag); -#if DEBUG_LOCKS +#if DEBUG_LOCKS||DEBUG_PE_LOCKS Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag); Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag); #endif diff --git a/C/write.c b/C/write.c index cd8dc52a7..a12e60263 100644 --- a/C/write.c +++ b/C/write.c @@ -316,7 +316,7 @@ wrputf(Float f, struct write_globs *wglb) /* writes a float */ localeconv()->decimal_point; size_t l1 = strlen((const char *)decimalpoint+1); #else - const unsigned char *decimalpoint = "."; + const unsigned char decimalpoint[2] = "."; size_t l1 = 0; #endif diff --git a/H/Yap.h b/H/Yap.h index a9bf28041..9420205ea 100755 --- a/H/Yap.h +++ b/H/Yap.h @@ -682,6 +682,7 @@ typedef struct thread_mbox { pthread_cond_t cond; struct idb_queue msgs; int nmsgs, nclients; // if nclients < 0 mailbox has been closed. + bool open; struct thread_mbox *next; } mbox_t; diff --git a/H/Yatom.h b/H/Yatom.h index 22675062d..a3b1df57e 100755 --- a/H/Yatom.h +++ b/H/Yatom.h @@ -1790,8 +1790,8 @@ PredPropByAtomAndMod (Atom at, Term cur_mod) #define UNLOCKPE(I,Z) \ ( (Z)->StatisticsForPred.NOfRetries=(I), UNLOCK((Z)->PELock) ) #else -#define PELOCK(I,Z) LOCK((Z)->PELock) -#define UNLOCKPE(I,Z) UNLOCK((Z)->PELock) +#define PELOCK(I,Z) (LOCK((Z)->PELock)) +#define UNLOCKPE(I,Z) (UNLOCK((Z)->PELock)) #endif INLINE_ONLY EXTERN inline void AddPropToAtom(AtomEntry *, PropEntry *p); diff --git a/os/pl-thread.h b/os/pl-thread.h index d01925273..514fe1e75 100755 --- a/os/pl-thread.h +++ b/os/pl-thread.h @@ -96,8 +96,8 @@ control when threads can be created. We assume id == L_THREAD is optimized away if id is known at compile-time - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ - -#define IF_MT(id, g) if ( id == L_THREAD ) g +#define IF_MT(id,g) g +//#define IF_MT(id, g) if ( id == L_THREAD ) g #ifdef O_CONTENTION_STATISTICS #define countingMutexLock(cm) \ diff --git a/pl/threads.yap b/pl/threads.yap index c54c906fb..6b52786ff 100644 --- a/pl/threads.yap +++ b/pl/threads.yap @@ -957,6 +957,7 @@ mutex_destroy(Mutex) :- erase(R), fail. '$erase_mutex_info'(_). + /** @pred mutex_lock(+ _MutexId_) @@ -990,7 +991,7 @@ mutex_lock(A) :- mutex_create(A), mutex_lock(A). mutex_lock(V) :- - '$do_error'(type_error(atom,V),mutex_lock(V)). + '$do_error'(type_error(mutex,V),mutex_lock(V)). /** @pred mutex_trylock(+ _MutexId_) @@ -1011,7 +1012,7 @@ mutex_trylock(A) :- mutex_create(A), mutex_trylock(A). mutex_trylock(V) :- - '$do_error'(type_error(atom,V),mutex_trylock(V)). + '$do_error'(type_error(mutex,V),mutex_trylock(V)). /** @pred mutex_unlock(+ _MutexId_) @@ -1073,37 +1074,39 @@ once/1. */ + +with_mutex(M, G) :- + ( recorded('$mutex_alias',[Id|M],_) -> + '$with_mutex'(Id, G ) + ; + '$atom'(M ) -> + mutex_create(Id, [alias(M)]), + '$with_mutex'(M, G ) + ; + integer(M) -> + '$with_mutex'(M, G ) + ; + '$do_error'(type_error(mutex,M), with_mutex(M, G)) + ), nonvar(G). % preserve env. + + +/* with_mutex(M, G) :- ( '$no_threads' -> once(G) ; - var(M) -> - '$do_error'(instantiation_error,with_mutex(M, G)) - ; - var(G) -> - '$do_error'(instantiation_error,with_mutex(M, G)) + mutex_lock(M), + var(G) -> mutex_unlock(M), '$do_error'(instantiation_error,with_mutex(M, G)) ; \+ callable(G) -> - '$do_error'(type_error(callable,G),with_mutex(M, G)) + mutex_unlock(M), '$do_error'(type_error(callable,G),with_mutex(M, G)) ; - atom(M) -> - '$with_with_mutex'(WMId), - ( recorded('$mutex_alias',[Id|M],_) -> - true - ; '$new_mutex'(Id), - recorda('$mutex_alias',[Id|M],_) - ), - '$lock_mutex'(Id), - '$unlock_with_mutex'(WMId), - ( catch('$execute'(G), E, ('$unlock_mutex'(Id), throw(E))) -> - '$unlock_mutex'(Id) - ; '$unlock_mutex'(Id), + catch('$execute'(G), E, (mutex_unlock(M), throw(E))) -> + mutex_unlock(M) + ; mutex_unlock(M), fail - ) - ; - '$do_error'(type_error(atom,M),with_mutex(M, G)) ). - +*/ /** @pred current_mutex(? _MutexId_, ? _ThreadId_, ? _Count_) @@ -1116,8 +1119,8 @@ Enumerates all existing mutexes. If the mutex is held by some thread, */ current_mutex(M, T, NRefs) :- - recorded('$mutex_alias',[Id|M],_), - '$mutex_info'(Id, NRefs, T). + recorded('$mutex_alias',[Id|M],_), + '$mutex_info'(Id, NRefs, T). mutex_property(Mutex, Prop) :- ( nonvar(Mutex) -> @@ -1220,9 +1223,17 @@ anonymous message queues, may try to wait for later. message_queue_destroy(Name) :- var(Name), !, '$do_error'(instantiation_error,message_queue_destroy(Name)). +message_queue_destroy(Alias) :- + recorded('$thread_alias', [Id|Alias], Ref), + atom(Id), !, + '$message_queue_destroy'(Id), + erase(Ref). message_queue_destroy(Name) :- - '$message_queue_destroy'(Name), - fail. + atom(Name), + '$message_queue_destroy'(Name), + recorded('$thread_alias', [Name|_Alias], Ref), + erase(Ref), + fail. message_queue_destroy(_). /* @pred message_queue_property(+ _Queue_)