From 50659967ed42ed583218d07ed70ddad38316d9c7 Mon Sep 17 00:00:00 2001 From: Vitor Santos Costa Date: Tue, 25 Nov 2014 16:41:53 +0000 Subject: [PATCH] begin of support for named mutexes. --- C/adtdefs.c | 30 +- C/threads.c | 1547 ++++++++++++++++++++++++++------------------------- H/Yatom.h | 92 ++- 3 files changed, 888 insertions(+), 781 deletions(-) diff --git a/C/adtdefs.c b/C/adtdefs.c index 38a85a8a4..fe560bf15 100755 --- a/C/adtdefs.c +++ b/C/adtdefs.c @@ -1242,7 +1242,7 @@ Yap_PutValue(Atom a, Term v) WRITE_UNLOCK(p->VRWLock); } -void +bool Yap_PutAtomTranslation(Atom a, Int i) { AtomEntry *ae = RepAtom(a); @@ -1255,7 +1255,7 @@ Yap_PutAtomTranslation(Atom a, Int i) p = (TranslationEntry *) Yap_AllocAtomSpace(sizeof(TranslationEntry)); if (p == NULL) { WRITE_UNLOCK(ae->ARWLock); - return; + return false; } p->KindOfPE = TranslationProperty; p->Translation = i; @@ -1264,6 +1264,32 @@ Yap_PutAtomTranslation(Atom a, Int i) /* take care that the lock for the property will be inited even if someone else searches for the property */ WRITE_UNLOCK(ae->ARWLock); + return true; +} + +bool +Yap_PutAtomMutex(Atom a, void * i) +{ + AtomEntry *ae = RepAtom(a); + Prop p0; + MutexEntry *p; + + WRITE_LOCK(ae->ARWLock); + p0 = GetAPropHavingLock(ae, MutexProperty); + if (p0 == NIL) { + p = (MutexEntry *) Yap_AllocAtomSpace(sizeof(MutexEntry)); + if (p == NULL) { + WRITE_UNLOCK(ae->ARWLock); + return false; + } + p->KindOfPE = MutexProperty; + p->Mutex = i; + AddPropToAtom(RepAtom(a), (PropEntry *)p); + } + /* take care that the lock for the property will be inited even + if someone else searches for the property */ + WRITE_UNLOCK(ae->ARWLock); + return true; } Term diff --git a/C/threads.c b/C/threads.c index f2aae67f9..73b383bd3 100644 --- a/C/threads.c +++ b/C/threads.c @@ -1,26 +1,26 @@ /************************************************************************* -* * -* YAP Prolog * -* * -* Yap Prolog was developed at NCCUP - Universidade do Porto * -* * -* Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 * -* * -************************************************************************** -* * -* File: stdpreds.c * -* Last rev: * -* mods: * -* comments: threads * -* * -*************************************************************************/ + * * + * YAP Prolog * + * * + * Yap Prolog was developed at NCCUP - Universidade do Porto * + * * + * Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 * + * * + ************************************************************************** + * * + * File: stdpreds.c * + * Last rev: * + * mods: * + * comments: threads * + * * + *************************************************************************/ #ifdef SCCS static char SccsId[] = "%W% %G%"; #endif /** -@ingroup Threads -@{ + @ingroup Threads + @{ */ #include "Yap.h" @@ -175,16 +175,16 @@ mboxDestroy( mbox_t *mboxp USES_REGS ) struct idb_queue *msgsp = &mboxp->msgs; mboxp->open = false; if (mboxp->nclients == 0 ) { - pthread_cond_destroy(condp); - pthread_mutex_destroy(mutexp); - Yap_destroy_tqueue(msgsp PASS_REGS); - // at this point, there is nothing left to unlock! - return true; + pthread_cond_destroy(condp); + pthread_mutex_destroy(mutexp); + Yap_destroy_tqueue(msgsp PASS_REGS); + // at this point, there is nothing left to unlock! + return true; } else { - /* we have clients in the mailbox, try to wake them up one by one */ - pthread_cond_broadcast(condp); - pthread_mutex_unlock(mutexp); - return true; + /* we have clients in the mailbox, try to wake them up one by one */ + pthread_cond_broadcast(condp); + pthread_mutex_unlock(mutexp); + return true; } } @@ -196,8 +196,8 @@ mboxSend( mbox_t *mboxp, Term t USES_REGS ) struct idb_queue *msgsp = &mboxp->msgs; if (!mboxp->open) { - // oops, dead mailbox - return false; + // oops, dead mailbox + return false; } Yap_enqueue_tqueue(msgsp, t PASS_REGS); // printf("+ (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs); @@ -220,30 +220,30 @@ mboxReceive( mbox_t *mboxp, Term t USES_REGS ) } mboxp->nclients++; do { - rc = mboxp->nmsgs && Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS); - if (rc) { - mboxp->nclients--; - mboxp->nmsgs--; - //printf("- (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs); - // Yap_do_low_level_trace=1; - pthread_mutex_unlock(mutexp); - return true; - } else if (!mboxp->open) { - //printf("o (%d)\n", worker_id); - mboxp->nclients--; - if (!mboxp->nclients) {// release - pthread_cond_destroy(condp); - pthread_mutex_destroy(mutexp); - Yap_destroy_tqueue(msgsp PASS_REGS); - // at this point, there is nothing left to unlock! - } else { - pthread_cond_broadcast(condp); - pthread_mutex_unlock(mutexp); - } - return false; + rc = mboxp->nmsgs && Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS); + if (rc) { + mboxp->nclients--; + mboxp->nmsgs--; + //printf("- (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs); + // Yap_do_low_level_trace=1; + pthread_mutex_unlock(mutexp); + return true; + } else if (!mboxp->open) { + //printf("o (%d)\n", worker_id); + mboxp->nclients--; + if (!mboxp->nclients) {// release + pthread_cond_destroy(condp); + pthread_mutex_destroy(mutexp); + Yap_destroy_tqueue(msgsp PASS_REGS); + // at this point, there is nothing left to unlock! } else { - pthread_cond_wait(condp, mutexp); + pthread_cond_broadcast(condp); + pthread_mutex_unlock(mutexp); } + return false; + } else { + pthread_cond_wait(condp, mutexp); + } } while (!rc); return rc; } @@ -262,7 +262,7 @@ static int store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit) { CACHE_REGS - UInt pm; /* memory to be requested */ + UInt pm; /* memory to be requested */ Term tmod; if (tsize < MinTrailSpace) @@ -307,9 +307,9 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, tmod = CurrentModule; texit = Yap_StripModule(Deref(texit), &tmod); if (IsAtomTerm(tmod)) { - REMOTE_ThreadHandle(new_worker_id).texit_mod = tmod; + REMOTE_ThreadHandle(new_worker_id).texit_mod = tmod; } else { - Yap_Error(TYPE_ERROR_ATOM,tmod,"module in exit call should be an atom"); + Yap_Error(TYPE_ERROR_ATOM,tmod,"module in exit call should be an atom"); } REMOTE_ThreadHandle(new_worker_id).texit = Yap_StoreTermInDB(texit,7); @@ -390,7 +390,7 @@ static int setup_engine(int myworker_id, int init_thread) { CACHE_REGS - REGSTORE *standard_regs; + REGSTORE *standard_regs; set_system_thread_id( myworker_id, NULL ); standard_regs = (REGSTORE *)calloc(1,sizeof(REGSTORE)); @@ -408,7 +408,7 @@ setup_engine(int myworker_id, int init_thread) Yap_InitTime( myworker_id ); Yap_InitYaamRegs( myworker_id ); REFRESH_CACHE_REGS - Yap_ReleasePreAllocCodeSpace(Yap_PreAllocCodeSpace()); + Yap_ReleasePreAllocCodeSpace(Yap_PreAllocCodeSpace()); /* I exist */ GLOBAL_NOfThreadsCreated++; GLOBAL_NOfThreads++; @@ -423,7 +423,7 @@ static void start_thread(int myworker_id) { CACHE_REGS - pthread_setspecific(Yap_yaamregs_key, (void *)REMOTE_ThreadHandle(myworker_id).default_yaam_regs); + pthread_setspecific(Yap_yaamregs_key, (void *)REMOTE_ThreadHandle(myworker_id).default_yaam_regs); REFRESH_CACHE_REGS; worker_id = myworker_id; LOCAL = REMOTE(myworker_id); @@ -433,7 +433,7 @@ static void * thread_run(void *widp) { CACHE_REGS - Term tgoal, t; + Term tgoal, t; Term tgs[2]; int myworker_id = *((int *)widp); #ifdef OUTPUT_THREADS_TABLING @@ -652,8 +652,8 @@ Int Yap_thread_self(void) { CACHE_REGS - if (pthread_getspecific(Yap_yaamregs_key) == NULL) - return -1; + if (pthread_getspecific(Yap_yaamregs_key) == NULL) + return -1; return worker_id; } @@ -832,13 +832,10 @@ p_thread_exit( USES_REGS1 ) static Int p_thread_set_concurrency( USES_REGS1 ) { - Term tnew = Deref(ARG2); - int newc; #if HAVE_PTHREAD_GETCONCURRENCY -int cur; -#endif - - + int newc; + int cur; + Term tnew = Deref(ARG2); if (IsVarTerm(tnew)) { newc = 0; } else if (IsIntegerTerm(tnew)) { @@ -847,7 +844,6 @@ int cur; Yap_Error(TYPE_ERROR_INTEGER,tnew,"thread_set_concurrency/2"); return(FALSE); } -#if HAVE_PTHREAD_GETCONCURRENCY cur = MkIntegerTerm(pthread_getconcurrency()); if (pthread_setconcurrency(newc) != 0) { return FALSE; @@ -906,780 +902,785 @@ p_new_mutex( USES_REGS1 ) pthread_mutex_init(&mutp->m, &mat); mutp->owners = 0; mutp->tid_own = 0; - return Yap_unify(ARG1, MkIntegerTerm((Int)mutp)); + if (IsVarTerm((t1 = Deref(ARG1)))) { + return Yap_unify(t1, MkAddressTerm(mutp)); + } else if(IsAtomTerm(t1)) { + return Yap_PutAtomMutex( AtomOfTerm(t1), mutp ); + } } - static Int - p_destroy_mutex( USES_REGS1 ) - { - SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); +static Int +p_destroy_mutex( USES_REGS1 ) +{ + SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); - if (pthread_mutex_destroy(&mut->m) < 0) - return FALSE; - Yap_FreeCodeSpace((void *)mut); - return TRUE; - } + if (pthread_mutex_destroy(&mut->m) < 0) + return FALSE; + Yap_FreeCodeSpace((void *)mut); + return TRUE; +} - static Int - p_lock_mutex( USES_REGS1 ) - { - SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); +static Int +p_lock_mutex( USES_REGS1 ) +{ + SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); - #if DEBUG_LOCKS - MUTEX_LOCK(&mut->m); - #else - if (MUTEX_LOCK(&mut->m) < 0) - return FALSE; - #endif - mut->owners++; - mut->tid_own = worker_id; - return TRUE; - } +#if DEBUG_LOCKS + MUTEX_LOCK(&mut->m); +#else + if (MUTEX_LOCK(&mut->m) < 0) + return FALSE; +#endif + mut->owners++; + mut->tid_own = worker_id; + return TRUE; +} - static Int - p_trylock_mutex( USES_REGS1 ) - { - SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); +static Int +p_trylock_mutex( USES_REGS1 ) +{ + SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); - if (MUTEX_TRYLOCK(&mut->m) == EBUSY) - return FALSE; - mut->owners++; - mut->tid_own = worker_id; - return TRUE; - } + if (MUTEX_TRYLOCK(&mut->m) == EBUSY) + return FALSE; + mut->owners++; + mut->tid_own = worker_id; + return TRUE; +} - static Int - p_unlock_mutex( USES_REGS1 ) - { - SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); +static Int +p_unlock_mutex( USES_REGS1 ) +{ + SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); - #if DEBUG_LOCKS - MUTEX_UNLOCK(&mut->m); - #else - if (MUTEX_UNLOCK(&mut->m) < 0) - return FALSE; - #endif - mut->owners--; - return TRUE; - } +#if DEBUG_LOCKS + MUTEX_UNLOCK(&mut->m); +#else + if (MUTEX_UNLOCK(&mut->m) < 0) + return FALSE; +#endif + mut->owners--; + return TRUE; +} - static Int - p_with_mutex( USES_REGS1 ) - { - SWIMutex *mut; - Term t1 = Deref(ARG1), excep; - Int rc = FALSE; - Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL); - PredEntry *pe; - Term tm = CurrentModule; - Term tg = Deref(ARG2); +static Int +p_with_mutex( USES_REGS1 ) +{ + SWIMutex *mut; + Term t1 = Deref(ARG1), excep; + Int rc = FALSE; + Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL); + PredEntry *pe; + Term tm = CurrentModule; + Term tg = Deref(ARG2); - if (IsVarTerm(t1)) { - p_new_mutex( PASS_REGS1 ); - t1 = Deref(ARG1); - } - mut = (SWIMutex*)IntegerOfTerm(t1); - if (!p_lock_mutex( PASS_REGS1 )) { - return FALSE; - } + if (IsVarTerm(t1)) { + p_new_mutex( PASS_REGS1 ); + t1 = Deref(ARG1); + } + mut = (SWIMutex*)IntegerOfTerm(t1); + if (!p_lock_mutex( PASS_REGS1 )) { + return FALSE; + } - tg = Yap_StripModule(tg, &tm); - if (IsVarTerm(tg)) { - Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2"); - goto end; - } else if (IsApplTerm(tg)) { - register Functor f = FunctorOfTerm(tg); - register CELL *pt; - size_t i, arity; + tg = Yap_StripModule(tg, &tm); + if (IsVarTerm(tg)) { + Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2"); + goto end; + } else if (IsApplTerm(tg)) { + register Functor f = FunctorOfTerm(tg); + register CELL *pt; + size_t i, arity; f = FunctorOfTerm(tg); - if (IsExtensionFunctor(f)) { - Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); - goto end; - } - arity = ArityOfFunctor(f); - if (arity > MaxTemps) { - Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); - goto end; - } - pe = RepPredProp(PredPropByFunc(f, tm)); - pt = RepAppl(tg)+1; - for (i= 0; i < arity; i++ ) - XREGS[i+1] = pt[i]; - } else if (IsAtomTerm(tg)) { - pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm)); - } else if (IsPairTerm(tg)) { - register CELL *pt; - Functor f; + if (IsExtensionFunctor(f)) { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + arity = ArityOfFunctor(f); + if (arity > MaxTemps) { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + pe = RepPredProp(PredPropByFunc(f, tm)); + pt = RepAppl(tg)+1; + for (i= 0; i < arity; i++ ) + XREGS[i+1] = pt[i]; + } else if (IsAtomTerm(tg)) { + pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm)); + } else if (IsPairTerm(tg)) { + register CELL *pt; + Functor f; - f = FunctorDot; - pe = RepPredProp(PredPropByFunc(f, tm)); - pt = RepPair(tg); - XREGS[1] = pt[0]; - XREGS[2] = pt[1]; - } else { - Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); - goto end; - } - if ( - pe->OpcodeOfPred != FAIL_OPCODE && - Yap_execute_pred(pe, NULL PASS_REGS) ) { - rc = TRUE; - } - end: - ARG1 = MkIntegerTerm((Int)mut); - excep = Yap_GetException(); - p_unlock_mutex( PASS_REGS1 ); - if (creeping) { - Yap_signal( YAP_CREEP_SIGNAL ); - } else if ( excep != 0) { - return Yap_JumpToEnv(excep); - } - return rc; - } + f = FunctorDot; + pe = RepPredProp(PredPropByFunc(f, tm)); + pt = RepPair(tg); + XREGS[1] = pt[0]; + XREGS[2] = pt[1]; + } else { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + if ( + pe->OpcodeOfPred != FAIL_OPCODE && + Yap_execute_pred(pe, NULL PASS_REGS) ) { + rc = TRUE; + } + end: + ARG1 = MkIntegerTerm((Int)mut); + excep = Yap_GetException(); + p_unlock_mutex( PASS_REGS1 ); + if (creeping) { + Yap_signal( YAP_CREEP_SIGNAL ); + } else if ( excep != 0) { + return Yap_JumpToEnv(excep); + } + return rc; +} - static Int - p_with_with_mutex( USES_REGS1 ) - { - if (GLOBAL_WithMutex == NULL) { - p_new_mutex( PASS_REGS1 ); - GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); - } else { - ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex); - } - return p_lock_mutex( PASS_REGS1 ); - } +static Int +p_with_with_mutex( USES_REGS1 ) +{ + if (GLOBAL_WithMutex == NULL) { + p_new_mutex( PASS_REGS1 ); + GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); + } else { + ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex); + } + return p_lock_mutex( PASS_REGS1 ); +} - static Int - p_unlock_with_mutex( USES_REGS1 ) - { - ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex); - return p_unlock_mutex( PASS_REGS1 ); - } +static Int +p_unlock_with_mutex( USES_REGS1 ) +{ + ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex); + return p_unlock_mutex( PASS_REGS1 ); +} - static Int - p_mutex_info( USES_REGS1 ) - { - SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); +static Int +p_mutex_info( USES_REGS1 ) +{ + SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); - return Yap_unify(ARG2, MkIntegerTerm(mut->owners)) && - Yap_unify(ARG3, MkIntegerTerm(mut->tid_own)); - return TRUE; - } - - static Int - p_cond_create( USES_REGS1 ) - { - pthread_cond_t* condp; - - condp = (pthread_cond_t *)Yap_AllocCodeSpace(sizeof(pthread_cond_t)); - if (condp == NULL) { - return FALSE; - } - pthread_cond_init(condp, NULL); - return Yap_unify(ARG1, MkIntegerTerm((Int)condp)); - } - - typedef struct { - UInt indx; - mbox_t mbox; - } counted_mbox; - - static Int - p_mbox_create( USES_REGS1 ) - { - Term namet = Deref(ARG1); - mbox_t* mboxp = GLOBAL_named_mboxes; - - if (IsVarTerm(namet)) { - AtomEntry *ae; - int new; - mbox_t mbox; - - ae = Yap_lookupBlob(&mbox, sizeof(mbox), &PL_Message_Queue, &new); - namet = MkAtomTerm(RepAtom(ae)); - mboxp = (mbox_t *)(ae->rep.blob[0].data); - Yap_unify(ARG1, namet); - LOCK(GLOBAL_mboxq_lock); - } else if (IsAtomTerm(namet)) { - LOCK(GLOBAL_mboxq_lock); - while( mboxp && mboxp->name != namet) - mboxp = mboxp->next; - if (mboxp) { - UNLOCK(GLOBAL_mboxq_lock); - return FALSE; - } - mboxp = (mbox_t *)Yap_AllocCodeSpace(sizeof(mbox_t)); - if (mboxp == NULL) { - UNLOCK(GLOBAL_mboxq_lock); - return FALSE; - } - // global mbox, for now we'll just insert in list - mboxp->next = GLOBAL_named_mboxes; - GLOBAL_named_mboxes = mboxp; - } - bool rc = mboxCreate( namet, mboxp PASS_REGS ); - UNLOCK(GLOBAL_mboxq_lock); - return rc; - } - - static Int - p_mbox_destroy( USES_REGS1 ) - { - Term namet = Deref(ARG1); - mbox_t* mboxp = GLOBAL_named_mboxes, *prevp; - - if (IsVarTerm(namet) ) - return FALSE; - if (IsIntTerm(namet) ) { - return FALSE; - } - LOCK(GLOBAL_mboxq_lock); - prevp = NULL; - while( mboxp && mboxp->name != namet) { - prevp = mboxp; - mboxp = mboxp->next; - } - if (!mboxp) { - UNLOCK(GLOBAL_mboxq_lock); - return FALSE; - } - if (mboxp == GLOBAL_named_mboxes) { - GLOBAL_named_mboxes = mboxp->next; - } else { - prevp->next = mboxp->next; - } - UNLOCK(GLOBAL_mboxq_lock); - mboxDestroy(mboxp PASS_REGS); - Yap_FreeCodeSpace( (char *)mboxp ); - return TRUE; - } - - static mbox_t* - getMbox(Term t) - { - mbox_t* mboxp; - - if (IsAtomTerm(t=Deref(t))) { - Atom at = AtomOfTerm(t); - LOCK(GLOBAL_mboxq_lock); - if (IsBlob(at)) { - mboxp = (mbox_t *)(RepAtom(at)->rep.blob[0].data); - } else { - mboxp = GLOBAL_named_mboxes; - while( mboxp && mboxp->name != t) { - mboxp = mboxp->next; - } - } - if (!mboxp->open) - mboxp = NULL; - if (mboxp) { - pthread_mutex_lock(& mboxp->mutex); - } - UNLOCK(GLOBAL_mboxq_lock); - } else if (IsIntTerm(t)) { - int wid = IntOfTerm(t); - if (REMOTE(wid) && - (REMOTE_ThreadHandle(wid).in_use || REMOTE_ThreadHandle(wid).zombie)) - { - return &REMOTE_ThreadHandle(wid).mbox_handle; - } else { - return NULL; - } - if (!mboxp->open) - mboxp = NULL; - if (mboxp) { - pthread_mutex_lock(& mboxp->mutex); - } - } else { - return NULL; - } - return mboxp; - } - - - static Int - p_mbox_send( USES_REGS1 ) - { - Term namet = Deref(ARG1); - mbox_t* mboxp = getMbox(namet) ; - - if (!mboxp) - return FALSE; - return mboxSend(mboxp, Deref(ARG2) PASS_REGS); - } - - static Int - p_mbox_size( USES_REGS1 ) - { - Term namet = Deref(ARG1); - mbox_t* mboxp = getMbox(namet) ; - - if (!mboxp) - return FALSE; - return Yap_unify( ARG2, MkIntTerm(mboxp->nmsgs)); - } - - - static Int - p_mbox_receive( USES_REGS1 ) - { - Term namet = Deref(ARG1); - mbox_t* mboxp = getMbox(namet) ; - - if (!mboxp) - return FALSE; - return mboxReceive(mboxp, Deref(ARG2) PASS_REGS); - } - - - static Int - p_mbox_peek( USES_REGS1 ) - { - Term namet = Deref(ARG1); - mbox_t* mboxp = getMbox(namet) ; - - if (!mboxp) - return FALSE; - return mboxPeek(mboxp, Deref(ARG2) PASS_REGS); - } - - static Int - p_cond_destroy( USES_REGS1 ) - { - pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); - - if (pthread_cond_destroy(condp) < 0) - return FALSE; - Yap_FreeCodeSpace((void *)condp); - return TRUE; - } - - static Int - p_cond_signal( USES_REGS1 ) - { - pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); - - if (pthread_cond_signal(condp) < 0) - return FALSE; - return TRUE; - } - - static Int - p_cond_broadcast( USES_REGS1 ) - { - pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); - - if (pthread_cond_broadcast(condp) < 0) - return FALSE; - v return TRUE; - } - - static Int - p_cond_wait( USES_REGS1 ) - { - pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); - SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2)); - pthread_cond_wait(condp, &mut->m); + return Yap_unify(ARG2, MkIntegerTerm(mut->owners)) && + Yap_unify(ARG3, MkIntegerTerm(mut->tid_own)); return TRUE; - } +} - static Int - p_thread_stacks( USES_REGS1 ) - { /* '$thread_signal'(+P) */ - Int tid = IntegerOfTerm(Deref(ARG1)); - Int status= TRUE; +static Int +p_cond_create( USES_REGS1 ) +{ + pthread_cond_t* condp; - MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock)); - if (REMOTE(tid) && - (REMOTE_ThreadHandle(tid).in_use || REMOTE_ThreadHandle(tid).zombie)) { - status &= Yap_unify(ARG2,MkIntegerTerm(REMOTE_ThreadHandle(tid).ssize)); - status &= Yap_unify(ARG3,MkIntegerTerm(REMOTE_ThreadHandle(tid).tsize)); - status &= Yap_unify(ARG4,MkIntegerTerm(REMOTE_ThreadHandle(tid).sysize)); - MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); - return status; - } - MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); - return FALSE; - } + condp = (pthread_cond_t *)Yap_AllocCodeSpace(sizeof(pthread_cond_t)); + if (condp == NULL) { + return FALSE; + } + pthread_cond_init(condp, NULL); + return Yap_unify(ARG1, MkIntegerTerm((Int)condp)); +} - static Int - p_thread_atexit( USES_REGS1 ) - { /* '$thread_signal'(+P) */ - Term t; +typedef struct { + UInt indx; + mbox_t mbox; +} counted_mbox; - if (LOCAL_ThreadHandle.texit == NULL || - LOCAL_ThreadHandle.texit->Entry == MkAtomTerm(AtomTrue)) { - return FALSE; - } - do { - t = Yap_PopTermFromDB(LOCAL_ThreadHandle.texit); - if (t == 0) { - if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) { - LOCAL_Error_TYPE = YAP_NO_ERROR; - if (!Yap_growglobal(NULL)) { - Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage); - thread_die(worker_id, FALSE); - return FALSE; - } - } else { - LOCAL_Error_TYPE = YAP_NO_ERROR; - if (!Yap_growstack(LOCAL_ThreadHandle.tgoal->NOfCells*CellSize)) { - Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage); - thread_die(worker_id, FALSE); - return FALSE; - } - } - } - } while (t == 0); - LOCAL_ThreadHandle.texit = NULL; - return Yap_unify(ARG1, t) && Yap_unify(ARG2, LOCAL_ThreadHandle.texit_mod); - } +static Int +p_mbox_create( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = GLOBAL_named_mboxes; + + if (IsVarTerm(namet)) { + AtomEntry *ae; + int new; + mbox_t mbox; + + ae = Yap_lookupBlob(&mbox, sizeof(mbox), &PL_Message_Queue, &new); + namet = MkAtomTerm(RepAtom(ae)); + mboxp = (mbox_t *)(ae->rep.blob[0].data); + Yap_unify(ARG1, namet); + LOCK(GLOBAL_mboxq_lock); + } else if (IsAtomTerm(namet)) { + LOCK(GLOBAL_mboxq_lock); + while( mboxp && mboxp->name != namet) + mboxp = mboxp->next; + if (mboxp) { + UNLOCK(GLOBAL_mboxq_lock); + return FALSE; + } + mboxp = (mbox_t *)Yap_AllocCodeSpace(sizeof(mbox_t)); + if (mboxp == NULL) { + UNLOCK(GLOBAL_mboxq_lock); + return FALSE; + } + // global mbox, for now we'll just insert in list + mboxp->next = GLOBAL_named_mboxes; + GLOBAL_named_mboxes = mboxp; + } + bool rc = mboxCreate( namet, mboxp PASS_REGS ); + UNLOCK(GLOBAL_mboxq_lock); + return rc; +} + +static Int +p_mbox_destroy( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = GLOBAL_named_mboxes, *prevp; + + if (IsVarTerm(namet) ) + return FALSE; + if (IsIntTerm(namet) ) { + return FALSE; + } + LOCK(GLOBAL_mboxq_lock); + prevp = NULL; + while( mboxp && mboxp->name != namet) { + prevp = mboxp; + mboxp = mboxp->next; + } + if (!mboxp) { + UNLOCK(GLOBAL_mboxq_lock); + return FALSE; + } + if (mboxp == GLOBAL_named_mboxes) { + GLOBAL_named_mboxes = mboxp->next; + } else { + prevp->next = mboxp->next; + } + UNLOCK(GLOBAL_mboxq_lock); + mboxDestroy(mboxp PASS_REGS); + Yap_FreeCodeSpace( (char *)mboxp ); + return TRUE; +} + +static mbox_t* +getMbox(Term t) +{ + mbox_t* mboxp; + + if (IsAtomTerm(t=Deref(t))) { + Atom at = AtomOfTerm(t); + LOCK(GLOBAL_mboxq_lock); + if (IsBlob(at)) { + mboxp = (mbox_t *)(RepAtom(at)->rep.blob[0].data); + } else { + mboxp = GLOBAL_named_mboxes; + while( mboxp && mboxp->name != t) { + mboxp = mboxp->next; + } + } + if (!mboxp->open) + mboxp = NULL; + if (mboxp) { + pthread_mutex_lock(& mboxp->mutex); + } + UNLOCK(GLOBAL_mboxq_lock); + } else if (IsIntTerm(t)) { + int wid = IntOfTerm(t); + if (REMOTE(wid) && + (REMOTE_ThreadHandle(wid).in_use || REMOTE_ThreadHandle(wid).zombie)) + { + return &REMOTE_ThreadHandle(wid).mbox_handle; + } else { + return NULL; + } + if (!mboxp->open) + mboxp = NULL; + if (mboxp) { + pthread_mutex_lock(& mboxp->mutex); + } + } else { + return NULL; + } + return mboxp; +} +static Int +p_mbox_send( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = getMbox(namet) ; - static Int - p_thread_signal( USES_REGS1 ) - { /* '$thread_signal'(+P) */ - Int wid = IntegerOfTerm(Deref(ARG1)); - /* make sure the lock is available */ - MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock)); - if (!REMOTE_ThreadHandle(wid).in_use || - !REMOTE_ThreadHandle(wid).current_yaam_regs) { - MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock)); + if (!mboxp) + return FALSE; + return mboxSend(mboxp, Deref(ARG2) PASS_REGS); +} + +static Int +p_mbox_size( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = getMbox(namet) ; + + if (!mboxp) + return FALSE; + return Yap_unify( ARG2, MkIntTerm(mboxp->nmsgs)); +} + + +static Int +p_mbox_receive( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = getMbox(namet) ; + + if (!mboxp) + return FALSE; + return mboxReceive(mboxp, Deref(ARG2) PASS_REGS); +} + + +static Int +p_mbox_peek( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = getMbox(namet) ; + + if (!mboxp) + return FALSE; + return mboxPeek(mboxp, Deref(ARG2) PASS_REGS); +} + +static Int +p_cond_destroy( USES_REGS1 ) +{ + pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); + + if (pthread_cond_destroy(condp) < 0) + return FALSE; + Yap_FreeCodeSpace((void *)condp); + return TRUE; +} + +static Int +p_cond_signal( USES_REGS1 ) +{ + pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); + + if (pthread_cond_signal(condp) < 0) + return FALSE; + return TRUE; +} + +static Int +p_cond_broadcast( USES_REGS1 ) +{ + pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); + + if (pthread_cond_broadcast(condp) < 0) + return FALSE; + else return TRUE; - } - Yap_external_signal( wid, YAP_ITI_SIGNAL ); - MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock)); - return TRUE; - } +} - static Int - p_no_threads( USES_REGS1 ) - { /* '$thread_signal'(+P) */ - return FALSE; - } +static Int +p_cond_wait( USES_REGS1 ) +{ + pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); + SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2)); + pthread_cond_wait(condp, &mut->m); + return TRUE; +} - static Int - p_nof_threads( USES_REGS1 ) - { /* '$nof_threads'(+P) */ - int i = 0, wid; - LOCK(GLOBAL_ThreadHandlesLock); - for (wid = 0; wid < MAX_THREADS; wid++) { - if (!Yap_local[wid]) break; - if (REMOTE_ThreadHandle(wid).in_use) - i++; - } - UNLOCK(GLOBAL_ThreadHandlesLock); - return Yap_unify(ARG1,MkIntegerTerm(i)); - } +static Int +p_thread_stacks( USES_REGS1 ) +{ /* '$thread_signal'(+P) */ + Int tid = IntegerOfTerm(Deref(ARG1)); + Int status= TRUE; - static Int - p_max_workers( USES_REGS1 ) - { /* '$max_workers'(+P) */ - return Yap_unify(ARG1,MkIntegerTerm(MAX_WORKERS)); - } + MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock)); + if (REMOTE(tid) && + (REMOTE_ThreadHandle(tid).in_use || REMOTE_ThreadHandle(tid).zombie)) { + status &= Yap_unify(ARG2,MkIntegerTerm(REMOTE_ThreadHandle(tid).ssize)); + status &= Yap_unify(ARG3,MkIntegerTerm(REMOTE_ThreadHandle(tid).tsize)); + status &= Yap_unify(ARG4,MkIntegerTerm(REMOTE_ThreadHandle(tid).sysize)); + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); + return status; + } + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock)); + return FALSE; +} - static Int - p_max_threads( USES_REGS1 ) - { /* '$max_threads'(+P) */ - return Yap_unify(ARG1,MkIntegerTerm(MAX_THREADS)); - } +static Int +p_thread_atexit( USES_REGS1 ) +{ /* '$thread_signal'(+P) */ + Term t; - static Int - p_nof_threads_created( USES_REGS1 ) - { /* '$nof_threads'(+P) */ - return Yap_unify(ARG1,MkIntTerm(GLOBAL_NOfThreadsCreated)); - } - - static Int - p_thread_runtime( USES_REGS1 ) - { /* '$thread_runtime'(+P) */ - return Yap_unify(ARG1,MkIntegerTerm(GLOBAL_ThreadsTotalTime)); - } - - static Int - p_thread_self_lock( USES_REGS1 ) - { /* '$thread_unlock' */ - MUTEX_LOCK(&(LOCAL_ThreadHandle.tlock)); - return Yap_unify(ARG1,MkIntegerTerm(worker_id)); - } - - static Int - p_thread_unlock( USES_REGS1 ) - { /* '$thread_unlock' */ - Int wid = IntegerOfTerm(Deref(ARG1)); - MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock)); - return TRUE; - } - - intptr_t - system_thread_id(PL_thread_info_t *info) - { if ( !info ) - { CACHE_REGS - if ( LOCAL ) - info = SWI_thread_info(worker_id, NULL); - else - return -1; - } - #ifdef __linux__ - return info->pid; - #else - #ifdef __WINDOWS__ - return info->w32id; - #else - return (intptr_t)info->tid; - #endif - #endif - } - - void - Yap_InitFirstWorkerThreadHandle(void) - { - CACHE_REGS - set_system_thread_id(0, NULL); - LOCAL_ThreadHandle.id = 0; - LOCAL_ThreadHandle.in_use = TRUE; - LOCAL_ThreadHandle.default_yaam_regs = - &Yap_standard_regs; - LOCAL_ThreadHandle.current_yaam_regs = - &Yap_standard_regs; - LOCAL_PL_local_data_p->reg_cache = - &Yap_standard_regs; - LOCAL_ThreadHandle.pthread_handle = pthread_self(); - pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock, NULL); - pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock_status, NULL); - LOCAL_ThreadHandle.tdetach = MkAtomTerm(AtomFalse); - LOCAL_ThreadHandle.ref_count = 1; - } - - FILE *debugf; - - void Yap_InitThreadPreds(void) - { + if (LOCAL_ThreadHandle.texit == NULL || + LOCAL_ThreadHandle.texit->Entry == MkAtomTerm(AtomTrue)) { + return FALSE; + } + do { + t = Yap_PopTermFromDB(LOCAL_ThreadHandle.texit); + if (t == 0) { + if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) { + LOCAL_Error_TYPE = YAP_NO_ERROR; + if (!Yap_growglobal(NULL)) { + Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage); + thread_die(worker_id, FALSE); + return FALSE; + } + } else { + LOCAL_Error_TYPE = YAP_NO_ERROR; + if (!Yap_growstack(LOCAL_ThreadHandle.tgoal->NOfCells*CellSize)) { + Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage); + thread_die(worker_id, FALSE); + return FALSE; + } + } + } + } while (t == 0); + LOCAL_ThreadHandle.texit = NULL; + return Yap_unify(ARG1, t) && Yap_unify(ARG2, LOCAL_ThreadHandle.texit_mod); +} - Yap_InitCPred("$no_threads", 0, p_no_threads, 0); - Yap_InitCPred("$max_workers", 1, p_max_workers, 0); - Yap_InitCPred("$max_threads", 1, p_max_threads, 0); - Yap_InitCPred("$thread_new_tid", 1, p_thread_new_tid, 0); - Yap_InitCPred("$create_thread", 7, p_create_thread, 0); - Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag); - Yap_InitCPred("$thread_status_lock", 1, p_thread_status_lock, SafePredFlag); - Yap_InitCPred("$thread_status_unlock", 1, p_thread_status_unlock, SafePredFlag); - Yap_InitCPred("$thread_zombie_self", 1, p_thread_zombie_self, SafePredFlag); - Yap_InitCPred("$thread_join", 1, p_thread_join, 0); - Yap_InitCPred("$thread_destroy", 1, p_thread_destroy, 0); - Yap_InitCPred("thread_yield", 0, p_thread_yield, 0); - /** @pred thread_yield + +static Int +p_thread_signal( USES_REGS1 ) +{ /* '$thread_signal'(+P) */ + Int wid = IntegerOfTerm(Deref(ARG1)); + /* make sure the lock is available */ + MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock)); + if (!REMOTE_ThreadHandle(wid).in_use || + !REMOTE_ThreadHandle(wid).current_yaam_regs) { + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock)); + return TRUE; + } + Yap_external_signal( wid, YAP_ITI_SIGNAL ); + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock)); + return TRUE; +} + +static Int +p_no_threads( USES_REGS1 ) +{ /* '$thread_signal'(+P) */ + return FALSE; +} + +static Int +p_nof_threads( USES_REGS1 ) +{ /* '$nof_threads'(+P) */ + int i = 0, wid; + LOCK(GLOBAL_ThreadHandlesLock); + for (wid = 0; wid < MAX_THREADS; wid++) { + if (!Yap_local[wid]) break; + if (REMOTE_ThreadHandle(wid).in_use) + i++; + } + UNLOCK(GLOBAL_ThreadHandlesLock); + return Yap_unify(ARG1,MkIntegerTerm(i)); +} + +static Int +p_max_workers( USES_REGS1 ) +{ /* '$max_workers'(+P) */ + return Yap_unify(ARG1,MkIntegerTerm(MAX_WORKERS)); +} + +static Int +p_max_threads( USES_REGS1 ) +{ /* '$max_threads'(+P) */ + return Yap_unify(ARG1,MkIntegerTerm(MAX_THREADS)); +} + +static Int +p_nof_threads_created( USES_REGS1 ) +{ /* '$nof_threads'(+P) */ + return Yap_unify(ARG1,MkIntTerm(GLOBAL_NOfThreadsCreated)); +} + +static Int +p_thread_runtime( USES_REGS1 ) +{ /* '$thread_runtime'(+P) */ + return Yap_unify(ARG1,MkIntegerTerm(GLOBAL_ThreadsTotalTime)); +} + +static Int +p_thread_self_lock( USES_REGS1 ) +{ /* '$thread_unlock' */ + MUTEX_LOCK(&(LOCAL_ThreadHandle.tlock)); + return Yap_unify(ARG1,MkIntegerTerm(worker_id)); +} + +static Int +p_thread_unlock( USES_REGS1 ) +{ /* '$thread_unlock' */ + Int wid = IntegerOfTerm(Deref(ARG1)); + MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock)); + return TRUE; +} + +intptr_t +system_thread_id(PL_thread_info_t *info) +{ if ( !info ) + { CACHE_REGS + if ( LOCAL ) + info = SWI_thread_info(worker_id, NULL); + else + return -1; + } +#ifdef __linux__ + return info->pid; +#else +#ifdef __WINDOWS__ + return info->w32id; +#else + return (intptr_t)info->tid; +#endif +#endif +} + +void +Yap_InitFirstWorkerThreadHandle(void) +{ + CACHE_REGS + set_system_thread_id(0, NULL); + LOCAL_ThreadHandle.id = 0; + LOCAL_ThreadHandle.in_use = TRUE; + LOCAL_ThreadHandle.default_yaam_regs = + &Yap_standard_regs; + LOCAL_ThreadHandle.current_yaam_regs = + &Yap_standard_regs; + LOCAL_PL_local_data_p->reg_cache = + &Yap_standard_regs; + LOCAL_ThreadHandle.pthread_handle = pthread_self(); + pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock, NULL); + pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock_status, NULL); + LOCAL_ThreadHandle.tdetach = MkAtomTerm(AtomFalse); + LOCAL_ThreadHandle.ref_count = 1; +} + +FILE *debugf; + +void Yap_InitThreadPreds(void) +{ - Voluntarily relinquish the processor. + Yap_InitCPred("$no_threads", 0, p_no_threads, 0); + Yap_InitCPred("$max_workers", 1, p_max_workers, 0); + Yap_InitCPred("$max_threads", 1, p_max_threads, 0); + Yap_InitCPred("$thread_new_tid", 1, p_thread_new_tid, 0); + Yap_InitCPred("$create_thread", 7, p_create_thread, 0); + Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag); + Yap_InitCPred("$thread_status_lock", 1, p_thread_status_lock, SafePredFlag); + Yap_InitCPred("$thread_status_unlock", 1, p_thread_status_unlock, SafePredFlag); + Yap_InitCPred("$thread_zombie_self", 1, p_thread_zombie_self, SafePredFlag); + Yap_InitCPred("$thread_join", 1, p_thread_join, 0); + Yap_InitCPred("$thread_destroy", 1, p_thread_destroy, 0); + Yap_InitCPred("thread_yield", 0, p_thread_yield, 0); + /** @pred thread_yield - */ - Yap_InitCPred("$detach_thread", 1, p_thread_detach, 0); - Yap_InitCPred("$thread_detached", 1, p_thread_detached, 0); - Yap_InitCPred("$thread_detached", 2, p_thread_detached2, 0); - Yap_InitCPred("$thread_exit", 0, p_thread_exit, 0); - Yap_InitCPred("thread_setconcurrency", 2, p_thread_set_concurrency, 0); - /** @pred thread_setconcurrency(+ _Old_, - _New_) + Voluntarily relinquish the processor. - Determine the concurrency of the process, which is defined as the - maximum number of concurrently active threads. `Active` here means - they are using CPU time. This option is provided if the - thread-implementation provides - `pthread_setconcurrency()`. Solaris is a typical example of this - family. On other systems this predicate unifies _Old_ to 0 (zero) - and succeeds silently. + */ + Yap_InitCPred("$detach_thread", 1, p_thread_detach, 0); + Yap_InitCPred("$thread_detached", 1, p_thread_detached, 0); + Yap_InitCPred("$thread_detached", 2, p_thread_detached2, 0); + Yap_InitCPred("$thread_exit", 0, p_thread_exit, 0); + Yap_InitCPred("thread_setconcurrency", 2, p_thread_set_concurrency, 0); + /** @pred thread_setconcurrency(+ _Old_, - _New_) - */ - Yap_InitCPred("$valid_thread", 1, p_valid_thread, 0); - Yap_InitCPred("$new_mutex", 1, p_new_mutex, SafePredFlag); - Yap_InitCPred("$destroy_mutex", 1, p_destroy_mutex, SafePredFlag); - Yap_InitCPred("$lock_mutex", 1, p_lock_mutex, SafePredFlag); - Yap_InitCPred("$trylock_mutex", 1, p_trylock_mutex, SafePredFlag); - Yap_InitCPred("$unlock_mutex", 1, p_unlock_mutex, SafePredFlag); - Yap_InitCPred("$with_mutex", 2, p_with_mutex, MetaPredFlag); - Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0); - Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0); - Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag); - Yap_InitCPred("$cond_create", 1, p_cond_create, SafePredFlag); - Yap_InitCPred("$cond_destroy", 1, p_cond_destroy, SafePredFlag); - Yap_InitCPred("$cond_signal", 1, p_cond_signal, SafePredFlag); - Yap_InitCPred("$cond_broadcast", 1, p_cond_broadcast, SafePredFlag); - Yap_InitCPred("$cond_wait", 2, p_cond_wait, SafePredFlag); - Yap_InitCPred("$message_queue_create", 1, p_mbox_create, SafePredFlag); - Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag); - Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag); - Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag); - Yap_InitCPred("$message_queue_size", 2, p_mbox_size, SafePredFlag); - Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag); - Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag); - Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag); - Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag); - Yap_InitCPred("$nof_threads_created", 1, p_nof_threads_created, SafePredFlag); - Yap_InitCPred("$thread_sleep", 4, p_thread_sleep, SafePredFlag); - Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag); - Yap_InitCPred("$thread_self_lock", 1, p_thread_self_lock, SafePredFlag); - Yap_InitCPred("$thread_run_at_exit", 2, p_thread_atexit, SafePredFlag); - Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag); - #if DEBUG_LOCKS||DEBUG_PE_LOCKS - Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag); - Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag); - #endif - } - - #else - - int - Yap_NOfThreads(void) { - // GLOBAL_ThreadHandlesLock is held - #ifdef YAPOR - return 2; - #else - return 1; - #endif - } + Determine the concurrency of the process, which is defined as the + maximum number of concurrently active threads. `Active` here means + they are using CPU time. This option is provided if the + thread-implementation provides + `pthread_setconcurrency()`. Solaris is a typical example of this + family. On other systems this predicate unifies _Old_ to 0 (zero) + and succeeds silently. - static Int - p_no_threads(void) - { /* '$thread_signal'(+P) */ - return TRUE; - } + */ + Yap_InitCPred("$valid_thread", 1, p_valid_thread, 0); + Yap_InitCPred("$new_mutex", 1, p_new_mutex, SafePredFlag); + Yap_InitCPred("$destroy_mutex", 1, p_destroy_mutex, SafePredFlag); + Yap_InitCPred("$lock_mutex", 1, p_lock_mutex, SafePredFlag); + Yap_InitCPred("$trylock_mutex", 1, p_trylock_mutex, SafePredFlag); + Yap_InitCPred("$unlock_mutex", 1, p_unlock_mutex, SafePredFlag); + Yap_InitCPred("$with_mutex", 2, p_with_mutex, MetaPredFlag); + Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0); + Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0); + Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag); + Yap_InitCPred("$cond_create", 1, p_cond_create, SafePredFlag); + Yap_InitCPred("$cond_destroy", 1, p_cond_destroy, SafePredFlag); + Yap_InitCPred("$cond_signal", 1, p_cond_signal, SafePredFlag); + Yap_InitCPred("$cond_broadcast", 1, p_cond_broadcast, SafePredFlag); + Yap_InitCPred("$cond_wait", 2, p_cond_wait, SafePredFlag); + Yap_InitCPred("$message_queue_create", 1, p_mbox_create, SafePredFlag); + Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag); + Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag); + Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag); + Yap_InitCPred("$message_queue_size", 2, p_mbox_size, SafePredFlag); + Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag); + Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag); + Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag); + Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag); + Yap_InitCPred("$nof_threads_created", 1, p_nof_threads_created, SafePredFlag); + Yap_InitCPred("$thread_sleep", 4, p_thread_sleep, SafePredFlag); + Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag); + Yap_InitCPred("$thread_self_lock", 1, p_thread_self_lock, SafePredFlag); + Yap_InitCPred("$thread_run_at_exit", 2, p_thread_atexit, SafePredFlag); + Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag); +#if DEBUG_LOCKS||DEBUG_PE_LOCKS + Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag); + Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag); +#endif +} - static Int - p_nof_threads(void) - { /* '$nof_threads'(+P) */ - return Yap_unify(ARG1,MkIntTerm(1)); - } +#else - static Int - p_max_threads(void) - { /* '$nof_threads'(+P) */ - return Yap_unify(ARG1,MkIntTerm(1)); - } +int +Yap_NOfThreads(void) { + // GLOBAL_ThreadHandlesLock is held +#ifdef YAPOR + return 2; +#else + return 1; +#endif +} - static Int - p_nof_threads_created(void) - { /* '$nof_threads'(+P) */ - return Yap_unify(ARG1,MkIntTerm(1)); - } - static Int - p_thread_runtime(void) - { /* '$thread_runtime'(+P) */ - return Yap_unify(ARG1,MkIntTerm(0)); - } +static Int +p_no_threads(void) +{ /* '$thread_signal'(+P) */ + return TRUE; +} - static Int - p_thread_self(void) - { /* '$thread_runtime'(+P) */ - return Yap_unify(ARG1,MkIntTerm(0)); - } +static Int +p_nof_threads(void) +{ /* '$nof_threads'(+P) */ + return Yap_unify(ARG1,MkIntTerm(1)); +} - static Int - p_thread_stacks(void) - { /* '$thread_runtime'(+P) */ - return FALSE; - } +static Int +p_max_threads(void) +{ /* '$nof_threads'(+P) */ + return Yap_unify(ARG1,MkIntTerm(1)); +} - static Int - p_thread_unlock(void) - { /* '$thread_runtime'(+P) */ - return TRUE; - } +static Int +p_nof_threads_created(void) +{ /* '$nof_threads'(+P) */ + return Yap_unify(ARG1,MkIntTerm(1)); +} - static Int - p_max_workers(void) - { /* '$max_workers'(+P) */ - return Yap_unify(ARG1,MkIntTerm(1)); - } +static Int +p_thread_runtime(void) +{ /* '$thread_runtime'(+P) */ + return Yap_unify(ARG1,MkIntTerm(0)); +} - static Int - p_new_mutex(void) +static Int +p_thread_self(void) +{ /* '$thread_runtime'(+P) */ + return Yap_unify(ARG1,MkIntTerm(0)); +} + +static Int +p_thread_stacks(void) +{ /* '$thread_runtime'(+P) */ + return FALSE; +} + +static Int +p_thread_unlock(void) +{ /* '$thread_runtime'(+P) */ + return TRUE; +} + +static Int +p_max_workers(void) +{ /* '$max_workers'(+P) */ + return Yap_unify(ARG1,MkIntTerm(1)); +} + +static Int +p_new_mutex(void) { /* '$max_workers'(+P) */ static int mutexes = 1; return Yap_unify(ARG1, MkIntegerTerm(mutexes++) ); } - static Int - p_with_mutex( USES_REGS1 ) - { - Int mut; - Term t1 = Deref(ARG1), excep; - Int rc = FALSE; - Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL); - PredEntry *pe; - Term tm = CurrentModule; - Term tg = Deref(ARG2); +static Int +p_with_mutex( USES_REGS1 ) +{ + Int mut; + Term t1 = Deref(ARG1), excep; + Int rc = FALSE; + Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL); + PredEntry *pe; + Term tm = CurrentModule; + Term tg = Deref(ARG2); - if (IsVarTerm(t1)) { - p_new_mutex( PASS_REGS1 ); - t1 = Deref(ARG1); - mut = IntOfTerm(t1); - } + if (IsVarTerm(t1)) { + p_new_mutex( PASS_REGS1 ); + t1 = Deref(ARG1); + mut = IntOfTerm(t1); + } - tg = Yap_StripModule(tg, &tm); - if (IsVarTerm(tg)) { - Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2"); - goto end; - } else if (IsApplTerm(tg)) { - register Functor f = FunctorOfTerm(tg); - register CELL *pt; - size_t i, arity; + tg = Yap_StripModule(tg, &tm); + if (IsVarTerm(tg)) { + Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2"); + goto end; + } else if (IsApplTerm(tg)) { + register Functor f = FunctorOfTerm(tg); + register CELL *pt; + size_t i, arity; f = FunctorOfTerm(tg); - if (IsExtensionFunctor(f)) { - Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); - goto end; - } - arity = ArityOfFunctor(f); - if (arity > MaxTemps) { - Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); - goto end; - } - pe = RepPredProp(PredPropByFunc(f, tm)); - pt = RepAppl(tg)+1; - for (i= 0; i < arity; i++ ) - XREGS[i+1] = pt[i]; - } else if (IsAtomTerm(tg)) { - pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm)); - } else if (IsPairTerm(tg)) { - register CELL *pt; - Functor f; + if (IsExtensionFunctor(f)) { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + arity = ArityOfFunctor(f); + if (arity > MaxTemps) { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + pe = RepPredProp(PredPropByFunc(f, tm)); + pt = RepAppl(tg)+1; + for (i= 0; i < arity; i++ ) + XREGS[i+1] = pt[i]; + } else if (IsAtomTerm(tg)) { + pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm)); + } else if (IsPairTerm(tg)) { + register CELL *pt; + Functor f; - f = FunctorDot; - pe = RepPredProp(PredPropByFunc(f, tm)); - pt = RepPair(tg); - XREGS[1] = pt[0]; - XREGS[2] = pt[1]; - } else { - Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); - goto end; - } - if ( - pe->OpcodeOfPred != FAIL_OPCODE && - Yap_execute_pred(pe, NULL PASS_REGS) ) { - rc = TRUE; - } - end: - ARG1 = MkIntegerTerm(mut); - excep = Yap_GetException(); - if (creeping) { - Yap_signal( YAP_CREEP_SIGNAL ); - } else if ( excep != 0) { - return Yap_JumpToEnv(excep); - } - return rc; - } + f = FunctorDot; + pe = RepPredProp(PredPropByFunc(f, tm)); + pt = RepPair(tg); + XREGS[1] = pt[0]; + XREGS[2] = pt[1]; + } else { + Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2"); + goto end; + } + if ( + pe->OpcodeOfPred != FAIL_OPCODE && + Yap_execute_pred(pe, NULL PASS_REGS) ) { + rc = TRUE; + } + end: + ARG1 = MkIntegerTerm(mut); + excep = Yap_GetException(); + if (creeping) { + Yap_signal( YAP_CREEP_SIGNAL ); + } else if ( excep != 0) { + return Yap_JumpToEnv(excep); + } + return rc; +} void Yap_InitFirstWorkerThreadHandle(void) @@ -1710,5 +1711,5 @@ void Yap_InitThreadPreds(void) /** -@} + @} */ diff --git a/H/Yatom.h b/H/Yatom.h index 3a75271f3..d869673e3 100755 --- a/H/Yatom.h +++ b/H/Yatom.h @@ -1242,7 +1242,7 @@ RepTranslationProp (Prop p) INLINE_ONLY inline EXTERN Prop AbsTranslationProp (TranslationEntry * p); -INLINE_ONLY inline EXTERN Prop + INLINE_ONLY inline EXTERN Prop AbsTranslationProp (TranslationEntry * p) { return (Prop) (p); @@ -1252,7 +1252,7 @@ AbsTranslationProp (TranslationEntry * p) #endif #define TranslationProperty 0xfff4 -void Yap_PutAtomTranslation(Atom a, Int i); +bool Yap_PutAtomTranslation(Atom a, Int i); /* get translation prop for atom; */ static inline TranslationEntry * @@ -1270,10 +1270,7 @@ Yap_GetTranslationProp(Atom at) if (p0 == NIL) return (TranslationEntry *)NULL; return p; } - - -/* only unary and binary expressions are acceptable */ - + INLINE_ONLY inline EXTERN PropFlags IsTranslationProperty (int); INLINE_ONLY inline EXTERN PropFlags @@ -1282,7 +1279,90 @@ IsTranslationProperty (int flags) return (PropFlags) ((flags == TranslationProperty)); } +/*** handle named mutexes */ +/* translationnamed mutex property entry structure */ + typedef struct mutex_entry + { + Prop NextOfPE; /* used to chain properties */ + PropFlags KindOfPE; /* kind of property */ + void *Mutex; /* used to hash the atom as an integer; */ + } MutexEntry; + +#if USE_OFFSETS_IN_PROPS + + INLINE_ONLY inline EXTERN MutexEntry *RepMutexProp (Prop p); + + INLINE_ONLY inline EXTERN MutexEntry * + RepMutexProp (Prop p) + { + return (MutexEntry *) (AtomBase + Unsigned (p)); + } + + + + INLINE_ONLY inline EXTERN Prop AbsMutexProp (MutexEntry * p); + + INLINE_ONLY inline EXTERN Prop + AbsMutexProp (MutexEntry * p) + { + return (Prop) (Addr (p) - AtomBase); + } + + +#else + + INLINE_ONLY inline EXTERN MutexEntry *RepMutexProp (Prop p); + + INLINE_ONLY inline EXTERN MutexEntry * + RepMutexProp (Prop p) + { + return (MutexEntry *) (p); + } + + + + INLINE_ONLY inline EXTERN Prop AbsMutexProp (MutexEntry * p); + + INLINE_ONLY inline EXTERN Prop + AbsMutexProp (MutexEntry * p) + { + return (Prop) (p); + } + + +#endif +#define MutexProperty 0xfff5 + + bool Yap_PutAtomMutex(Atom a, void *ptr); + + /* get mutex prop for atom; */ + static inline MutexEntry * + Yap_GetMutexProp(Atom at) + { + Prop p0; + AtomEntry *ae = RepAtom(at); + MutexEntry *p; + + READ_LOCK(ae->ARWLock); + p = RepMutexProp(p0 = ae->PropsOfAE); + while (p0 && p->KindOfPE != MutexProperty) + p = RepMutexProp(p0 = p->NextOfPE); + READ_UNLOCK(ae->ARWLock); + if (p0 == NIL) return NULL; + return p; + } + + INLINE_ONLY inline EXTERN PropFlags IsMutexProperty (int); + + INLINE_ONLY inline EXTERN PropFlags + IsMutexProperty (int flags) + { + return (PropFlags) ((flags == MutexProperty)); + } + + /* end of code for named mutexes */ + typedef enum { STATIC_ARRAY = 1, DYNAMIC_ARRAY = 2,