From 3c7779ec78113516f211b5a94dbefdf3ff4aa88b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADtor=20Santos=20Costa?= Date: Mon, 13 Oct 2014 12:34:52 +0100 Subject: [PATCH] move message queues to C --- C/dbase.c | 331 ++++++++++++++++++++---------------------- C/globals.c | 14 +- C/init.c | 16 ++ C/threads.c | 255 ++++++++++++++++++++++++++++++-- C/tracer.c | 2 +- H/Regs.h | 4 + H/Yap.h | 33 +++++ H/Yatom.h | 2 +- H/dglobals.h | 2 + H/hglobals.h | 2 + H/iglobals.h | 2 + H/rglobals.h | 2 + misc/GLOBALS | 2 + misc/buildlocalglobal | 26 ++-- pl/threads.yap | 166 ++++----------------- 15 files changed, 516 insertions(+), 343 deletions(-) diff --git a/C/dbase.c b/C/dbase.c index 5a30bce19..2ed037510 100644 --- a/C/dbase.c +++ b/C/dbase.c @@ -181,21 +181,6 @@ typedef struct { } SFKeep; #endif -typedef struct queue_entry { - struct queue_entry *next; - DBTerm *DBT; -} QueueEntry; - -typedef struct idb_queue -{ - Functor id; /* identify this as being pointed to by a DBRef */ - SMALLUNSGN Flags; /* always required */ -#if PARALLEL_YAP - rwlock_t QRWLock; /* a simple lock to protect this entry */ -#endif - QueueEntry *FirstInQueue, *LastInQueue; -} db_queue; - #define HashFieldMask ((CELL)0xffL) #define DualHashFieldMask ((CELL)0xffffL) #define TripleHashFieldMask ((CELL)0xffffffL) @@ -5184,6 +5169,108 @@ Yap_StoreTermInDBPlusExtraSpace(Term t, UInt extra_size, UInt *sz) { return o; } +void +Yap_init_tqueue( db_queue *dbq ) +{ + dbq->id = FunctorDBRef; + dbq->Flags = DBClMask; + dbq->FirstInQueue = dbq->LastInQueue = NULL; + INIT_RWLOCK(dbq->QRWLock); +} + +void +Yap_destroy_tqueue( db_queue *dbq USES_REGS) +{ + QueueEntry * cur_instance = dbq->FirstInQueue; + while (cur_instance) { + /* release space for cur_instance */ + keepdbrefs(cur_instance->DBT PASS_REGS); + ErasePendingRefs(cur_instance->DBT PASS_REGS); + FreeDBSpace((char *) cur_instance->DBT); + FreeDBSpace((char *) cur_instance); + } + dbq->FirstInQueue = + dbq->LastInQueue = NULL; + +} + +bool +Yap_enqueue_tqueue(db_queue *father_key, Term t USES_REGS) +{ + QueueEntry *x; + while ((x = (QueueEntry *)AllocDBSpace(sizeof(QueueEntry))) == NULL) { + if (!Yap_growheap(FALSE, sizeof(QueueEntry), NULL)) { + Yap_Error(OUT_OF_HEAP_ERROR, TermNil, "in findall"); + return false; + } + } + /* Yap_LUClauseSpace += sizeof(QueueEntry); */ + x->DBT = StoreTermInDB(Deref(t), 2 PASS_REGS); + if (x->DBT == NULL) { + return false; + } + x->next = NULL; + if (father_key->LastInQueue != NULL) + father_key->LastInQueue->next = x; + father_key->LastInQueue = x; + if (father_key->FirstInQueue == NULL) { + father_key->FirstInQueue = x; + } + return true; + +} + +bool +Yap_dequeue_tqueue(db_queue *father_key, Term t, bool first, bool release USES_REGS) +{ + Term TDB; + QueueEntry * cur_instance = father_key->FirstInQueue, *prev = NULL; + while (cur_instance) { + while ((TDB = GetDBTerm(cur_instance->DBT, false PASS_REGS)) == 0L) { + if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) { + LOCAL_Error_TYPE = YAP_NO_ERROR; + if (!Yap_growglobal(NULL)) { + Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage); + return false; + } + } else { + LOCAL_Error_TYPE = YAP_NO_ERROR; + if (!Yap_gcl(LOCAL_Error_Size, 2, ENV, gc_P(P,CP))) { + Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage); + return false; + } + } + } + if (Yap_unify(t, TDB)) { + if (release) { + if (cur_instance == father_key->FirstInQueue) { + father_key->FirstInQueue = cur_instance->next; + } + if (cur_instance == father_key->LastInQueue) { + father_key->LastInQueue = prev; + } + if (prev) { + prev->next = cur_instance->next; + } + /* release space for cur_instance */ + keepdbrefs(cur_instance->DBT PASS_REGS); + ErasePendingRefs(cur_instance->DBT PASS_REGS); + FreeDBSpace((char *) cur_instance->DBT); + FreeDBSpace((char *) cur_instance); + } + return true; + } else { + // just getting the first + if (first) + return false; + // but keep on going, if we want to check everything. + prev = cur_instance; + cur_instance = cur_instance->next; + } + } + return false; + +} static Int p_init_queue( USES_REGS1 ) @@ -5198,10 +5285,7 @@ p_init_queue( USES_REGS1 ) } } /* Yap_LUClauseSpace += sizeof(db_queue); */ - dbq->id = FunctorDBRef; - dbq->Flags = DBClMask; - dbq->FirstInQueue = dbq->LastInQueue = NULL; - INIT_RWLOCK(dbq->QRWLock); + Yap_init_tqueue( dbq ); t = MkIntegerTerm((Int)dbq); return Yap_unify(ARG1, t); } @@ -5211,8 +5295,8 @@ static Int p_enqueue( USES_REGS1 ) { Term Father = Deref(ARG1); - QueueEntry *x; db_queue *father_key; + bool rc; if (IsVarTerm(Father)) { Yap_Error(INSTANTIATION_ERROR, Father, "enqueue"); @@ -5222,34 +5306,16 @@ p_enqueue( USES_REGS1 ) return FALSE; } else father_key = (db_queue *)IntegerOfTerm(Father); - while ((x = (QueueEntry *)AllocDBSpace(sizeof(QueueEntry))) == NULL) { - if (!Yap_growheap(FALSE, sizeof(QueueEntry), NULL)) { - Yap_Error(OUT_OF_HEAP_ERROR, TermNil, "in findall"); - return FALSE; - } - } - /* Yap_LUClauseSpace += sizeof(QueueEntry); */ - x->DBT = StoreTermInDB(Deref(ARG2), 2 PASS_REGS); - if (x->DBT == NULL) { - return FALSE; - } - x->next = NULL; WRITE_LOCK(father_key->QRWLock); - if (father_key->LastInQueue != NULL) - father_key->LastInQueue->next = x; - father_key->LastInQueue = x; - if (father_key->FirstInQueue == NULL) { - father_key->FirstInQueue = x; - } + rc = Yap_enqueue_tqueue(father_key, Deref(ARG2) PASS_REGS); WRITE_UNLOCK(father_key->QRWLock); - return TRUE; + return rc; } static Int p_enqueue_unlocked( USES_REGS1 ) { Term Father = Deref(ARG1); - QueueEntry *x; db_queue *father_key; if (IsVarTerm(Father)) { @@ -5260,25 +5326,7 @@ p_enqueue_unlocked( USES_REGS1 ) return FALSE; } else father_key = (db_queue *)IntegerOfTerm(Father); - while ((x = (QueueEntry *)AllocDBSpace(sizeof(QueueEntry))) == NULL) { - if (!Yap_growheap(FALSE, sizeof(QueueEntry), NULL)) { - Yap_Error(OUT_OF_HEAP_ERROR, TermNil, "in findall"); - return FALSE; - } - } - /* Yap_LUClauseSpace += sizeof(QueueEntry); */ - x->DBT = StoreTermInDB(Deref(ARG2), 2 PASS_REGS); - if (x->DBT == NULL) { - return FALSE; - } - x->next = NULL; - if (father_key->LastInQueue != NULL) - father_key->LastInQueue->next = x; - father_key->LastInQueue = x; - if (father_key->FirstInQueue == NULL) { - father_key->FirstInQueue = x; - } - return TRUE; + return Yap_enqueue_tqueue(father_key, Deref(ARG2) PASS_REGS); } /* when reading an entry in the data base we are making it accessible from @@ -5301,14 +5349,14 @@ keepdbrefs(DBTerm *entryref USES_REGS) return; } while ((ref = *--cp) != NIL) { - if (!(ref->Flags & LogUpdMask)) { - LOCK(ref->lock); - if(!(ref->Flags & InUseMask)) { - ref->Flags |= InUseMask; - TRAIL_REF(ref); /* So that fail will erase it */ + if (!(ref->Flags & LogUpdMask)) { + LOCK(ref->lock); + if(!(ref->Flags & InUseMask)) { + ref->Flags |= InUseMask; + TRAIL_REF(ref); /* So that fail will erase it */ + } + UNLOCK(ref->lock); } - UNLOCK(ref->lock); - } } } @@ -5321,47 +5369,28 @@ p_dequeue( USES_REGS1 ) Term Father = Deref(ARG1); if (IsVarTerm(Father)) { - Yap_Error(INSTANTIATION_ERROR, Father, "dequeue"); - return FALSE; + Yap_Error(INSTANTIATION_ERROR, Father, "dequeue"); + return FALSE; } else if (!IsIntegerTerm(Father)) { - Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue"); - return FALSE; - } else - father_key = (db_queue *)IntegerOfTerm(Father); - WRITE_LOCK(father_key->QRWLock); - if ((cur_instance = father_key->FirstInQueue) == NULL) { - /* an empty queue automatically goes away */ - WRITE_UNLOCK(father_key->QRWLock); - FreeDBSpace((char *)father_key); - return FALSE; + Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue"); + return FALSE; } else { - Term TDB; - if (cur_instance == father_key->LastInQueue) - father_key->FirstInQueue = father_key->LastInQueue = NULL; - else - father_key->FirstInQueue = cur_instance->next; - WRITE_UNLOCK(father_key->QRWLock); - while ((TDB = GetDBTerm(cur_instance->DBT, FALSE PASS_REGS)) == 0L) { - if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) { - LOCAL_Error_TYPE = YAP_NO_ERROR; - if (!Yap_growglobal(NULL)) { - Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage); + father_key = (db_queue *)IntegerOfTerm(Father); + WRITE_LOCK(father_key->QRWLock); + if ((cur_instance = father_key->FirstInQueue) == NULL) { + /* an empty queue automatically goes away */ + WRITE_UNLOCK(father_key->QRWLock); + FreeDBSpace((char *)father_key); return FALSE; - } - } else { - LOCAL_Error_TYPE = YAP_NO_ERROR; - if (!Yap_gcl(LOCAL_Error_Size, 2, ENV, gc_P(P,CP))) { - Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage); - return FALSE; - } } - } - /* release space for cur_instance */ - keepdbrefs(cur_instance->DBT PASS_REGS); - ErasePendingRefs(cur_instance->DBT PASS_REGS); - FreeDBSpace((char *) cur_instance->DBT); - FreeDBSpace((char *) cur_instance); - return Yap_unify(ARG2, TDB); + if (!Yap_dequeue_tqueue(father_key, ARG2, true, true PASS_REGS) ) + return FALSE; + if (cur_instance == father_key->LastInQueue) + father_key->FirstInQueue = father_key->LastInQueue = NULL; + else + father_key->FirstInQueue = cur_instance->next; + WRITE_UNLOCK(father_key->QRWLock); + return TRUE; } } @@ -5370,58 +5399,30 @@ static Int p_dequeue_unlocked( USES_REGS1 ) { db_queue *father_key; - QueueEntry *cur_instance, *prev_instance; + QueueEntry *cur_instance; Term Father = Deref(ARG1); if (IsVarTerm(Father)) { - Yap_Error(INSTANTIATION_ERROR, Father, "dequeue"); - return FALSE; + Yap_Error(INSTANTIATION_ERROR, Father, "dequeue"); + return FALSE; } else if (!IsIntegerTerm(Father)) { - Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue"); - return FALSE; - } else - father_key = (db_queue *)IntegerOfTerm(Father); - prev_instance = NULL; - cur_instance = father_key->FirstInQueue; - while (cur_instance) { - Term TDB; - while ((TDB = GetDBTerm(cur_instance->DBT, FALSE PASS_REGS)) == 0L) { - if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) { - LOCAL_Error_TYPE = YAP_NO_ERROR; - if (!Yap_growglobal(NULL)) { - Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage); + Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue"); + return FALSE; + } else { + father_key = (db_queue *)IntegerOfTerm(Father); + if ((cur_instance = father_key->FirstInQueue) == NULL) { + /* an empty queue automatically goes away */ + FreeDBSpace((char *)father_key); return FALSE; - } - } else { - LOCAL_Error_TYPE = YAP_NO_ERROR; - if (!Yap_gcl(LOCAL_Error_Size, 2, ENV, gc_P(P,CP))) { - Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage); - return FALSE; - } } - } - if (Yap_unify(ARG2, TDB)) { - if (prev_instance) { - prev_instance->next = cur_instance->next; - if (father_key->LastInQueue == cur_instance) - father_key->LastInQueue = prev_instance; - } else if (cur_instance == father_key->LastInQueue) + if (!Yap_dequeue_tqueue(father_key, ARG2, true, true PASS_REGS) ) + return FALSE; + if (cur_instance == father_key->LastInQueue) father_key->FirstInQueue = father_key->LastInQueue = NULL; else father_key->FirstInQueue = cur_instance->next; - /* release space for cur_instance */ - keepdbrefs(cur_instance->DBT PASS_REGS); - ErasePendingRefs(cur_instance->DBT PASS_REGS); - FreeDBSpace((char *) cur_instance->DBT); - FreeDBSpace((char *) cur_instance); return TRUE; - } else { - prev_instance = cur_instance; - cur_instance = cur_instance->next; - } } - /* an empty queue automatically goes away */ - return FALSE; } static Int @@ -5432,41 +5433,29 @@ p_peek_queue( USES_REGS1 ) Term Father = Deref(ARG1); if (IsVarTerm(Father)) { - Yap_Error(INSTANTIATION_ERROR, Father, "dequeue"); - return FALSE; + Yap_Error(INSTANTIATION_ERROR, Father, "dequeue"); + return FALSE; } else if (!IsIntegerTerm(Father)) { - Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue"); - return FALSE; - } else - father_key = (db_queue *)IntegerOfTerm(Father); - cur_instance = father_key->FirstInQueue; - while (cur_instance) { - Term TDB; - while ((TDB = GetDBTerm(cur_instance->DBT, FALSE PASS_REGS)) == 0L) { - if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) { - LOCAL_Error_TYPE = YAP_NO_ERROR; - if (!Yap_growglobal(NULL)) { - Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage); + Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue"); + return FALSE; + } else { + father_key = (db_queue *)IntegerOfTerm(Father); + if ((cur_instance = father_key->FirstInQueue) == NULL) { + /* an empty queue automatically goes away */ + FreeDBSpace((char *)father_key); return FALSE; - } - } else { - LOCAL_Error_TYPE = YAP_NO_ERROR; - if (!Yap_gcl(LOCAL_Error_Size, 2, ENV, gc_P(P,CP))) { - Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage); - return FALSE; - } } - } - if (Yap_unify(ARG2, TDB)) { + if (!Yap_dequeue_tqueue(father_key, ARG2, true, false PASS_REGS) ) + return FALSE; + if (cur_instance == father_key->LastInQueue) + father_key->FirstInQueue = father_key->LastInQueue = NULL; + else + father_key->FirstInQueue = cur_instance->next; return TRUE; - } - cur_instance = cur_instance->next; } - return FALSE; } - static Int p_clean_queues( USES_REGS1 ) { diff --git a/C/globals.c b/C/globals.c index 1a1f4d33c..9242f995a 100644 --- a/C/globals.c +++ b/C/globals.c @@ -190,10 +190,11 @@ CreateNewArena(CELL *ptr, UInt size) } static Term -NewArena(UInt size, UInt arity, CELL *where USES_REGS) +NewArena(UInt size, int wid, UInt arity, CELL *where) { Term t; UInt new_size; + WORKER_REGS(wid) if (where == NULL || where == HR) { while (HR+size > ASP-1024) { @@ -226,7 +227,7 @@ p_allocate_arena( USES_REGS1 ) Yap_Error(TYPE_ERROR_INTEGER,t,"allocate_arena"); return FALSE; } - return Yap_unify(ARG2,NewArena(IntegerOfTerm(t), 1, NULL PASS_REGS)); + return Yap_unify(ARG2,NewArena(IntegerOfTerm(t), worker_id, 1, NULL)); } @@ -240,8 +241,7 @@ p_default_arena_size( USES_REGS1 ) void Yap_AllocateDefaultArena(Int gsize, Int attsize, int wid) { - CACHE_REGS - REMOTE_GlobalArena(wid) = NewArena(gsize, 2, NULL PASS_REGS); + REMOTE_GlobalArena(wid) = NewArena(gsize, wid, 2, NULL); } static void @@ -1579,7 +1579,7 @@ nb_queue(UInt arena_sz USES_REGS) return FALSE; if (arena_sz < 4*1024) arena_sz = 4*1024; - queue_arena = NewArena(arena_sz, 1, NULL PASS_REGS); + queue_arena = NewArena(arena_sz, worker_id, 1, NULL); if (queue_arena == 0L) { return FALSE; } @@ -1933,7 +1933,7 @@ p_nb_heap( USES_REGS1 ) ar[HEAP_MAX] = tsize; if (arena_sz < 1024) arena_sz = 1024; - heap_arena = NewArena(arena_sz,1,NULL PASS_REGS); + heap_arena = NewArena(arena_sz,worker_id,1,NULL); if (heap_arena == 0L) { return FALSE; } @@ -2215,7 +2215,7 @@ p_nb_beam( USES_REGS1 ) ar[HEAP_MAX] = tsize; if (arena_sz < 1024) arena_sz = 1024; - beam_arena = NewArena(arena_sz,1,NULL PASS_REGS); + beam_arena = NewArena(arena_sz,worker_id,1,NULL); if (beam_arena == 0L) { return FALSE; } diff --git a/C/init.c b/C/init.c index fdfde20c2..0676716e3 100755 --- a/C/init.c +++ b/C/init.c @@ -1183,6 +1183,22 @@ InitThreadHandle(int wid) pthread_mutex_init(&(REMOTE_ThreadHandle(wid).tlock_status), NULL); REMOTE_ThreadHandle(wid).tdetach = (CELL)0; REMOTE_ThreadHandle(wid).cmod = (CELL)0; + { + mbox_t * mboxp = &REMOTE_ThreadHandle(wid).mbox_handle; + pthread_mutex_t *mutexp; + pthread_cond_t *condp; + struct idb_queue *msgsp; + + mboxp->name = MkIntTerm(0); + condp = & mboxp->cond; + pthread_cond_init(condp, NULL); + mutexp = & mboxp->mutex; + pthread_mutex_init(mutexp, NULL); + msgsp = & mboxp->msgs; + mboxp->nmsgs = 0; + Yap_init_tqueue(msgsp); +} + } int diff --git a/C/threads.c b/C/threads.c index ff5c577c9..8b5b1fb6f 100644 --- a/C/threads.c +++ b/C/threads.c @@ -51,6 +51,7 @@ static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; return TRUE; } #include "threads.h" + /* * This file includes the definition of threads in Yap. Threads * are supposed to be compatible with the SWI-Prolog thread package. @@ -130,13 +131,116 @@ allocate_new_tid(void) return new_worker_id; } +static bool +mboxCreate( Term namet, mbox_t *mboxp USES_REGS ) +{ + pthread_mutex_t *mutexp; + pthread_cond_t *condp; + struct idb_queue *msgsp; + + memset(mboxp, 0, sizeof(mbox_t)); + UNLOCK(GLOBAL_mboxq_lock); + condp = & mboxp->cond; + pthread_cond_init(condp, NULL); + mutexp = & mboxp->mutex; + pthread_mutex_init(mutexp, NULL); + msgsp = & mboxp->msgs; + mboxp->nmsgs = 0; + mboxp->nclients = 0; + Yap_init_tqueue(msgsp); + // match at the end, when everything is built. + mboxp->name = namet; +return true; +} + +static bool +mboxDestroy( mbox_t *mboxp USES_REGS ) +{ + pthread_mutex_t *mutexp = &mboxp->mutex; + pthread_cond_t *condp = &mboxp->cond; + struct idb_queue *msgsp = &mboxp->msgs; + + if (mboxp->nclients == 0) { + pthread_cond_destroy(condp); + pthread_mutex_destroy(mutexp); + Yap_destroy_tqueue(msgsp PASS_REGS); + // at this point, there is nothing left to unlock! + return true; + } else { + /* we have clients in the mailbox, try to wake them up one by one */ + if (mboxp->nclients > 0) + mboxp->nclients = - mboxp->nclients; + pthread_cond_signal(condp); + pthread_mutex_unlock(mutexp); + return true; + } +} + +static bool +mboxSend( mbox_t *mboxp, Term t USES_REGS ) +{ + pthread_mutex_t *mutexp = &mboxp->mutex; + pthread_cond_t *condp = &mboxp->cond; + struct idb_queue *msgsp = &mboxp->msgs; + + if (mboxp->nclients < 0) { + // oops, dead mailbox + return false; + } + Yap_enqueue_tqueue(msgsp, t PASS_REGS); + mboxp->nmsgs++; + pthread_cond_broadcast(condp); + pthread_mutex_unlock(mutexp); + return true; +} + +static bool +mboxReceive( mbox_t *mboxp, Term t USES_REGS ) +{ + pthread_mutex_t *mutexp = &mboxp->mutex; + pthread_cond_t *condp = &mboxp->cond; + struct idb_queue *msgsp = &mboxp->msgs; + bool rc; + + if (mboxp->nclients >= 0) { + mboxp->nclients++; + } else { + return false; // don't try to read if someone else already closed down... + } + do { + rc = Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS); + if (rc) { + mboxp->nmsgs--; + if (mboxp->nclients > 0) { + mboxp->nclients--; + } else { + mboxp->nclients++; + return mboxDestroy( mboxp PASS_REGS); + } + pthread_mutex_unlock(mutexp); + } else { + pthread_cond_wait(condp, mutexp); + } + } while (!rc); + return rc; +} + +static bool +mboxPeek( mbox_t *mboxp, Term t USES_REGS ) +{ + pthread_mutex_t *mutexp = &mboxp->mutex; + struct idb_queue *msgsp = &mboxp->msgs; + bool rc = Yap_dequeue_tqueue(msgsp, t, false, false PASS_REGS); + pthread_mutex_unlock(mutexp); + return rc; +} + static int -store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal, Term *tpdetach, Term *tpexit) +store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit) { CACHE_REGS UInt pm; /* memory to be requested */ Term tmod; - Term tdetach, tgoal; if (tsize < MinTrailSpace) tsize = MinTrailSpace; @@ -161,7 +265,7 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal return FALSE; } REMOTE_ThreadHandle(new_worker_id).tgoal = - Yap_StoreTermInDB(Deref(*tpgoal), 7); + Yap_StoreTermInDB(Deref(tgoal), 7); if (CurrentModule) { REMOTE_ThreadHandle(new_worker_id).cmod = @@ -169,7 +273,7 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal } else { REMOTE_ThreadHandle(new_worker_id).cmod = USER_MODULE; } - tdetach = Deref(*tpdetach); + tdetach = Deref(tdetach); if (IsVarTerm(tdetach)){ REMOTE_ThreadHandle(new_worker_id).tdetach = MkAtomTerm(AtomFalse); @@ -177,10 +281,10 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal REMOTE_ThreadHandle(new_worker_id).tdetach = tdetach; } - tgoal = Yap_StripModule(Deref(*tpexit), &tmod); + texit = Yap_StripModule(Deref(texit), &tmod); REMOTE_ThreadHandle(new_worker_id).texit_mod = tmod; REMOTE_ThreadHandle(new_worker_id).texit = - Yap_StoreTermInDB(tgoal,7); + Yap_StoreTermInDB(texit,7); REMOTE_ThreadHandle(new_worker_id).local_preds = NULL; REMOTE_ThreadHandle(new_worker_id).start_of_timesp = @@ -189,6 +293,8 @@ store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tpgoal NULL; REMOTE_ScratchPad(new_worker_id).ptr = NULL; + // reset arena info + REMOTE_GlobalArena(new_worker_id) =0; return TRUE; } @@ -267,6 +373,8 @@ setup_engine(int myworker_id, int init_thread) REMOTE_PL_local_data_p(myworker_id)->reg_cache = standard_regs; Yap_InitExStacks(myworker_id, REMOTE_ThreadHandle(myworker_id).tsize, REMOTE_ThreadHandle(myworker_id).ssize); REMOTE_SourceModule(myworker_id) = CurrentModule = REMOTE_ThreadHandle(myworker_id).cmod; + // create a mbox + mboxCreate( MkIntTerm(myworker_id), &REMOTE_ThreadHandle(myworker_id).mbox_handle PASS_REGS ); Yap_InitTime( myworker_id ); Yap_InitYaamRegs( myworker_id ); REFRESH_CACHE_REGS @@ -384,7 +492,7 @@ p_thread_new_tid( USES_REGS1 ) } static int -init_thread_engine(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term *tgoal, Term *tdetach, Term *texit) +init_thread_engine(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit) { return store_specs(new_worker_id, ssize, tsize, sysize, tgoal, tdetach, texit); } @@ -415,7 +523,7 @@ p_create_thread( USES_REGS1 ) return FALSE; } /* make sure we can proceed */ - if (!init_thread_engine(new_worker_id, ssize, tsize, sysize, &ARG1, &ARG5, &ARG6)) + if (!init_thread_engine(new_worker_id, ssize, tsize, sysize, ARG1, ARG5, ARG6)) return FALSE; //REMOTE_ThreadHandle(new_worker_id).pthread_handle = 0L; REMOTE_ThreadHandle(new_worker_id).id = new_worker_id; @@ -547,7 +655,7 @@ Yap_thread_create_engine(YAP_thread_attr *ops) pthread_setspecific(Yap_yaamregs_key, (const void *)&Yap_standard_regs); MUTEX_LOCK(&(REMOTE_ThreadHandle(0).tlock)); } - if (!init_thread_engine(new_id, ops->ssize, ops->tsize, ops->sysize, &t, &t, &(ops->egoal))) + if (!init_thread_engine(new_id, ops->ssize, ops->tsize, ops->sysize, t, t, (ops->egoal))) return -1; //REMOTE_ThreadHandle(new_id).pthread_handle = 0L; REMOTE_ThreadHandle(new_id).id = new_id; @@ -848,6 +956,127 @@ p_cond_create( USES_REGS1 ) return Yap_unify(ARG1, MkIntegerTerm((Int)condp)); } + +static Int +p_mbox_create( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = GLOBAL_named_mboxes; + + if (IsVarTerm(namet)) { + char buf[256]; + sprintf(buf, "$%p", mboxp); + namet = MkAtomTerm(Yap_FullLookupAtom(buf)); + Yap_unify(ARG1, namet); + } + if (IsAtomTerm(namet)) { + LOCK(GLOBAL_mboxq_lock); + while( mboxp && mboxp->name != namet) + mboxp = mboxp->next; + if (mboxp) { + UNLOCK(GLOBAL_mboxq_lock); + return FALSE; + } + mboxp = (mbox_t *)Yap_AllocCodeSpace(sizeof(mbox_t)); + if (mboxp == NULL) { + UNLOCK(GLOBAL_mboxq_lock); + return FALSE; + } + // global mbox, for now we'll just insert in list + mboxp->next = GLOBAL_named_mboxes; + GLOBAL_named_mboxes = mboxp; + } + return mboxCreate( namet, mboxp PASS_REGS ); +} + +static Int +p_mbox_destroy( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = GLOBAL_named_mboxes, *prevp; + + if (IsVarTerm(namet) || !IsAtomTerm(namet)) + return FALSE; + LOCK(GLOBAL_mboxq_lock); + prevp = NULL; + while( mboxp && mboxp->name != namet) { + prevp = mboxp; + mboxp = mboxp->next; + } + if (!mboxp) { + UNLOCK(GLOBAL_mboxq_lock); + return FALSE; + } + if (mboxp == GLOBAL_named_mboxes) { + GLOBAL_named_mboxes = mboxp->next; + } else { + prevp->next = mboxp->next; + } + UNLOCK(GLOBAL_mboxq_lock); + mboxDestroy(mboxp PASS_REGS); + Yap_FreeCodeSpace( (char *)mboxp ); + return TRUE; +} + +static mbox_t* +getMbox(Term t) +{ + mbox_t* mboxp; + if (IsAtomTerm(t)) { + LOCK(GLOBAL_mboxq_lock); + mboxp = GLOBAL_named_mboxes; + while( mboxp && mboxp->name != t) { + mboxp = mboxp->next; + } + } else if (IsIntTerm(t)) { + int wid = IntOfTerm(t); + if (REMOTE(wid) && + (REMOTE_ThreadHandle(wid).in_use || REMOTE_ThreadHandle(wid).zombie)) + { + return &REMOTE_ThreadHandle(wid).mbox_handle; + } else { + return NULL; + } + } else { + return NULL; + } + if (mboxp) { + pthread_mutex_lock(& mboxp->mutex); + } + UNLOCK(GLOBAL_mboxq_lock); + return mboxp; +} + + +static Int +p_mbox_send( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = getMbox(namet) ; + + return mboxSend(mboxp, Deref(ARG2) PASS_REGS); +} + + +static Int +p_mbox_receive( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = getMbox(namet) ; + + return mboxReceive(mboxp, Deref(ARG2) PASS_REGS); +} + + +static Int +p_mbox_peek( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = getMbox(namet) ; + + return mboxPeek(mboxp, Deref(ARG2) PASS_REGS); +} + static Int p_cond_destroy( USES_REGS1 ) { @@ -884,9 +1113,8 @@ p_cond_wait( USES_REGS1 ) { pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1)); SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2)); - pthread_cond_wait(condp, &mut->m); - return TRUE; + return TRUE; } static Int @@ -1112,6 +1340,11 @@ and succeeds silently. Yap_InitCPred("$cond_signal", 1, p_cond_signal, SafePredFlag); Yap_InitCPred("$cond_broadcast", 1, p_cond_broadcast, SafePredFlag); Yap_InitCPred("$cond_wait", 2, p_cond_wait, SafePredFlag); + Yap_InitCPred("$message_queue_create", 1, p_mbox_create, SafePredFlag); + Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag); + Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag); + Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag); + Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag); Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag); Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag); Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag); diff --git a/C/tracer.c b/C/tracer.c index ca6c1548b..e5f8ff626 100644 --- a/C/tracer.c +++ b/C/tracer.c @@ -133,7 +133,7 @@ check_area(void) PredEntry *old_p[10000]; Term old_x1[10000], old_x2[10000], old_x3[10000]; -static CELL oldv; +//static CELL oldv; void low_level_trace(yap_low_level_port port, PredEntry *pred, CELL *args) diff --git a/H/Regs.h b/H/Regs.h index 5ed16c079..2f1ff1f61 100755 --- a/H/Regs.h +++ b/H/Regs.h @@ -85,6 +85,7 @@ INLINE_ONLY inline EXTERN void save_B(void); #define PASS_REGS #define USES_REGS1 void #define USES_REGS +#define WORKER_REGS(WID) typedef struct regstore_t { @@ -179,13 +180,16 @@ extern pthread_key_t Yap_yaamregs_key; #undef CACHE_REGS #undef REFRESH_CACHE_REGS #undef INIT_REGS +#undef CACHE_REGS #undef PASS_REGS #undef PASS_REGS1 #undef USES_REGS #undef USES_REGS1 +#undef WORKER_REGS #define CACHE_REGS REGSTORE *regcache = ((REGSTORE *)pthread_getspecific(Yap_yaamregs_key)); #define REFRESH_CACHE_REGS regcache = ((REGSTORE *)pthread_getspecific(Yap_yaamregs_key)); #define INIT_REGS , ((REGSTORE *)pthread_getspecific(Yap_yaamregs_key)) +#define WORKER_REGS(WID) REGSTORE *regcache = REMOTE_ThreadHandle(WID).current_yaam_regs; #define PASS_REGS1 regcache #define PASS_REGS , regcache #define USES_REGS1 struct regstore_t *regcache diff --git a/H/Yap.h b/H/Yap.h index 608956111..a9bf28041 100755 --- a/H/Yap.h +++ b/H/Yap.h @@ -652,7 +652,39 @@ typedef enum /************************/ + // queues are an example of collections of DB objects + typedef struct queue_entry { + struct queue_entry *next; + struct DB_TERM *DBT; + } QueueEntry; + + typedef struct idb_queue + { + struct FunctorEntryStruct *id; /* identify this as being pointed to by a DBRef */ + SMALLUNSGN Flags; /* always required */ + #if PARALLEL_YAP + rwlock_t QRWLock; /* a simple lock to protect this entry */ + #endif + QueueEntry *FirstInQueue, *LastInQueue; + } db_queue; + +void Yap_init_tqueue( db_queue *dbq ); +void Yap_destroy_tqueue( db_queue *dbq USES_REGS); +bool Yap_enqueue_tqueue(db_queue *father_key, Term t USES_REGS); +bool Yap_dequeue_tqueue(db_queue *father_key, Term t, bool first, bool release USES_REGS); + #ifdef THREADS + + +typedef struct thread_mbox { + Term name; + pthread_mutex_t mutex; + pthread_cond_t cond; + struct idb_queue msgs; + int nmsgs, nclients; // if nclients < 0 mailbox has been closed. + struct thread_mbox *next; +} mbox_t; + typedef struct thandle { int in_use; int zombie; @@ -669,6 +701,7 @@ typedef struct thandle { REGSTORE *current_yaam_regs; struct pred_entry *local_preds; pthread_t pthread_handle; + mbox_t mbox_handle; int ref_count; #ifdef LOW_LEVEL_TRACER long long int thread_inst_count; diff --git a/H/Yatom.h b/H/Yatom.h index 7ab9dcc44..22675062d 100755 --- a/H/Yatom.h +++ b/H/Yatom.h @@ -904,7 +904,7 @@ typedef struct DB_STRUCT #define DBStructFlagsToDBStruct(X) ((DBRef)((char *)(X) - (CELL) &(((DBRef) NULL)->Flags))) -#if MULTIPLE_STAACKS +#if MULTIPLE_STACKS #define INIT_DBREF_COUNT(X) (X)->ref_count = 0 #define INC_DBREF_COUNT(X) (X)->ref_count++ #define DEC_DBREF_COUNT(X) (X)->ref_count-- diff --git a/H/dglobals.h b/H/dglobals.h index fa36be560..2abfde968 100644 --- a/H/dglobals.h +++ b/H/dglobals.h @@ -61,6 +61,8 @@ #if defined(THREADS) #define GLOBAL_master_thread Yap_global->master_thread_ +#define GLOBAL_named_mboxes Yap_global->named_mboxes_ +#define GLOBAL_mboxq_lock Yap_global->mboxq_lock_ #endif /* THREADS */ #define GLOBAL_stdout Yap_global->stdout_ diff --git a/H/hglobals.h b/H/hglobals.h index 059e2c76a..eea32423f 100644 --- a/H/hglobals.h +++ b/H/hglobals.h @@ -61,6 +61,8 @@ typedef struct global_data { #if defined(THREADS) pthread_t master_thread_; + struct thread_mbox* named_mboxes_; + lockvar mboxq_lock_; #endif /* THREADS */ struct io_stream* stdout_; diff --git a/H/iglobals.h b/H/iglobals.h index 885c2b7c4..67e8d3d0f 100644 --- a/H/iglobals.h +++ b/H/iglobals.h @@ -61,6 +61,8 @@ static void InitGlobal(void) { #if defined(THREADS) + GLOBAL_named_mboxes = NULL; + INIT_LOCK(GLOBAL_mboxq_lock); #endif /* THREADS */ GLOBAL_stdout = Soutput; diff --git a/H/rglobals.h b/H/rglobals.h index b881b781b..dce458f87 100644 --- a/H/rglobals.h +++ b/H/rglobals.h @@ -61,6 +61,8 @@ static void RestoreGlobal(void) { #if defined(THREADS) + + REINIT_LOCK(GLOBAL_mboxq_lock); #endif /* THREADS */ diff --git a/misc/GLOBALS b/misc/GLOBALS index 0064c4995..8c503d1f4 100755 --- a/misc/GLOBALS +++ b/misc/GLOBALS @@ -69,6 +69,8 @@ int PrologShouldHandleInterrupts void /* This is the guy who actually started the system, and who has the correct registers */ #if defined(THREADS) pthread_t master_thread void +struct thread_mbox* named_mboxes =NULL +lockvar mboxq_lock MkLock #endif /* THREADS */ // streams diff --git a/misc/buildlocalglobal b/misc/buildlocalglobal index 4ae0d95c1..60ae3cb81 100644 --- a/misc/buildlocalglobal +++ b/misc/buildlocalglobal @@ -1,6 +1,6 @@ :- use_module(library(lineutils), - [file_filter_with_init/5, + [file_filter_with_initialization/5, split/3, glue/3]). @@ -18,18 +18,18 @@ main :- warning(Warning), - %file_filter_with_init('misc/HEAPFIELDS','H/hstruct.h',gen_struct,Warning,['hstruct.h','HEAPFIELDS']), - %file_filter_with_init('misc/HEAPFIELDS','H/dhstruct.h',gen_dstruct,Warning,['dhstruct.h','HEAPFIELDS']), - %file_filter_with_init('misc/HEAPFIELDS','H/rhstruct.h',gen_hstruct,Warning,['rhstruct.h','HEAPFIELDS']), - %file_filter_with_init('misc/HEAPFIELDS','H/ihstruct.h',gen_init,Warning,['ihstruct.h','HEAPFIELDS']), - file_filter_with_init('misc/GLOBALS','H/hglobals.h',gen_struct,Warning,['hglobals.h','GLOBALS']), - file_filter_with_init('misc/GLOBALS','H/dglobals.h',gen_dstruct,Warning,['dglobals.h','GLOBALS']), - file_filter_with_init('misc/GLOBALS','H/rglobals.h',gen_hstruct,Warning,['rglobals.h','GLOBALS']), - file_filter_with_init('misc/GLOBALS','H/iglobals.h',gen_init,Warning,['iglobals.h','GLOBALS']), - file_filter_with_init('misc/LOCALS','H/hlocals.h',gen_struct,Warning,['hlocals.h','LOCALS']), - file_filter_with_init('misc/LOCALS','H/dlocals.h',gen_dstruct,Warning,['dlocals.h','LOCALS']), - file_filter_with_init('misc/LOCALS','H/rlocals.h',gen_hstruct,Warning,['rlocals.h','LOCALS']), - file_filter_with_init('misc/LOCALS','H/ilocals.h',gen_init,Warning,['ilocals.h','LOCALS']). + %file_filter_with_initialization('misc/HEAPFIELDS','H/hstruct.h',gen_struct,Warning,['hstruct.h','HEAPFIELDS']), + %file_filter_with_initialization('misc/HEAPFIELDS','H/dhstruct.h',gen_dstruct,Warning,['dhstruct.h','HEAPFIELDS']), + %file_filter_with_initialization('misc/HEAPFIELDS','H/rhstruct.h',gen_hstruct,Warning,['rhstruct.h','HEAPFIELDS']), + %file_filter_with_initialization('misc/HEAPFIELDS','H/ihstruct.h',gen_init,Warning,['ihstruct.h','HEAPFIELDS']), + file_filter_with_initialization('misc/GLOBALS','H/hglobals.h',gen_struct,Warning,['hglobals.h','GLOBALS']), + file_filter_with_initialization('misc/GLOBALS','H/dglobals.h',gen_dstruct,Warning,['dglobals.h','GLOBALS']), + file_filter_with_initialization('misc/GLOBALS','H/rglobals.h',gen_hstruct,Warning,['rglobals.h','GLOBALS']), + file_filter_with_initialization('misc/GLOBALS','H/iglobals.h',gen_init,Warning,['iglobals.h','GLOBALS']), + file_filter_with_initialization('misc/LOCALS','H/hlocals.h',gen_struct,Warning,['hlocals.h','LOCALS']), + file_filter_with_initialization('misc/LOCALS','H/dlocals.h',gen_dstruct,Warning,['dlocals.h','LOCALS']), + file_filter_with_initialization('misc/LOCALS','H/rlocals.h',gen_hstruct,Warning,['rlocals.h','LOCALS']), + file_filter_with_initialization('misc/LOCALS','H/ilocals.h',gen_init,Warning,['ilocals.h','LOCALS']). warning('~n /* This file, ~a, was generated automatically by \"yap -L misc/buildlocalglobal\"~n please do not update, update misc/~a instead */~n~n'). diff --git a/pl/threads.yap b/pl/threads.yap index 13cfd32fa..f58267caf 100644 --- a/pl/threads.yap +++ b/pl/threads.yap @@ -122,18 +122,21 @@ volatile(P) :- '$init_thread0' :- '$no_threads', !. '$init_thread0' :- - recorda('$thread_defaults', [0, 0, 0, false, true], _), - '$create_thread_mq'(0), - '$new_mutex'(Id), - assert_static(prolog:'$with_mutex_mutex'(Id)). + mutex_create(WMId), + assert_static(prolog:'$with_mutex_mutex'(WMId) ), + fail. +'$init_thread0' :- + recorda('$thread_defaults', [0, 0, 0, false, true], _). '$reinit_thread0' :- '$no_threads', !. +'$reinit_thread0' :- fail, + abolish(prolog:'$with_mutex_mutex'/1), + fail. '$reinit_thread0' :- - '$create_thread_mq'(0), -% abolish(prolog:'$with_mutex_mutex',1), - '$new_mutex'(Id), - asserta_static((prolog:'$with_mutex_mutex'(Id) :- !)). + mutex_create(WMId), + asserta_static( ( prolog:'$with_mutex_mutex'(WMId) :- !) ). + '$top_thread_goal'(G, Detached) :- '$thread_self'(Id), @@ -175,7 +178,6 @@ thread_create(Goal) :- '$thread_options'([detached(true)], [], Stack, Trail, System, Detached, AtExit, G0), '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here - '$create_thread_mq'(Id), ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> @@ -199,7 +201,6 @@ thread_create(Goal, Id) :- '$thread_options'([], [], Stack, Trail, System, Detached, AtExit, G0), '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here - '$create_thread_mq'(Id), ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> @@ -260,7 +261,6 @@ thread_create(Goal, Id, Options) :- '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here '$record_alias_info'(Id, Alias), - '$create_thread_mq'(Id), ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> @@ -278,10 +278,6 @@ thread_create(Goal, Id, Options) :- recorded('$thread_exit_hook', [Id|_], R), erase(R), fail. -'$erase_thread_info'(Id) :- - recorded('$queue',q(Id,_,_,_,QKey),_), - '$empty_mqueue'(QKey), - fail. '$erase_thread_info'(_). @@ -1187,15 +1183,10 @@ message_queue_create(Id, [alias(Alias)]) :- message_queue_create(Id, [alias(Alias)]) :- \+ atom(Alias), !, '$do_error'(type_error(atom,Alias), message_queue_create(Id, [alias(Alias)])). -message_queue_create(Id, [alias(Alias)]) :- !, - '$new_mutex'(Mutex), - '$cond_create'(Cond), - ( recorded('$queue', q(Alias,_,_,_,_), _) -> - '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Id, [alias(Alias)])) - ; recorded('$thread_alias', [_|Alias], _) -> - '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Id, [alias(Alias)])) - ; '$mq_new_id'(Id, NId, Key), - recorda('$queue',q(Alias,Mutex,Cond,NId,Key), _) +message_queue_create(Alias, [alias(Alias)]) :- !, + ( recorded('$thread_alias', [_|Alias], _) -> + '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Alias, [alias(Alias)])) + ; '$message_queue_create'(Alias) ). message_queue_create(Id, [Option| _]) :- '$do_error'(domain_error(queue_option, Option), message_queue_create(Id, [Option| _])). @@ -1214,51 +1205,12 @@ created and _Queue_ is unified to its identifier. */ message_queue_create(Id) :- ( var(Id) -> % ISO DTR - message_queue_create(Id, []) + '$message_queue_create'(Id) ; atom(Id) -> % old behavior - message_queue_create(_, [alias(Id)]) + '$message_queue_create'(Id) ; '$do_error'(uninstantiation_error(Id), message_queue_create(Id)) ). -'$do_msg_queue_create'(Id) :- - \+ recorded('$queue',q(Id,_,_,_,_), _), - '$new_mutex'(Mutex), - '$cond_create'(Cond), - '$mq_new_id'(Id, NId, Key), - recorda('$queue',q(Id,Mutex,Cond,NId,Key), _), - fail. -'$do_msg_queue_create'(_). - -'$create_thread_mq'(TId) :- - recorded('$queue',q(TId,_,_,_,_), R), - erase(R), - fail. -'$create_thread_mq'(TId) :- - \+ recorded('$queue',q(TId,_,_,_,_), _), - '$new_mutex'(Mutex), - '$cond_create'(Cond), - '$mq_new_id'(TId, TId, Key), - recorda('$queue', q(TId,Mutex,Cond,TId,Key), _), - fail. -% recover space -'$create_thread_mq'(_). - -'$mq_new_id'(Id, Id, AtId) :- - integer(Id), !, - \+ recorded('$queue', q(_,_,_,Id,_), _), - '$init_db_queue'(AtId). -'$mq_new_id'(_, Id, AtId) :- - '$integers'(Id), - \+ recorded('$queue', q(_,_,_,Id,_), _), - !, - '$init_db_queue'(AtId). - -'$integers'(-1). -'$integers'(I) :- - '$integers'(I1), - I is I1-1. - - /** @pred message_queue_destroy(+ _Queue_) @@ -1277,29 +1229,6 @@ message_queue_destroy(Name) :- fail. message_queue_destroy(_). - -'$message_queue_destroy'(Queue) :- - recorded('$queue',q(Queue,Mutex,Cond,_,QKey),R), !, - '$clean_mqueue'(QKey), - '$cond_destroy'(Cond), - '$destroy_mutex'(Mutex), - erase(R). -'$message_queue_destroy'(Queue) :- - atomic(Queue), !, - '$do_error'(existence_error(message_queue,Queue),message_queue_destroy(Queue)). -'$message_queue_destroy'(Name) :- - '$do_error'(type_error(atom,Name),message_queue_destroy(Name)). - -'$clean_mqueue'(Queue) :- - '$db_dequeue'(Queue, _), - fail. -'$clean_mqueue'(_). - -'$empty_mqueue'(Queue) :- - '$db_dequeue_unlocked'(Queue, _), - fail. -'$empty_mqueue'(_). - message_queue_property(Id, Prop) :- ( nonvar(Id) -> '$check_message_queue_or_alias'(Id, message_queue_property(Id, Prop)) @@ -1368,32 +1297,15 @@ can seriously harm performance with many threads waiting on the same queue as all-but-the-winner perform a useless scan of the queue. If there is only one waiting thread or all waiting threads wait with an unbound variable an arbitrary thread is restarted to scan the queue. - - - - */ thread_send_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_send_message(Queue,Term)). thread_send_message(Queue, Term) :- - recorded('$thread_alias',[Id|Queue],_), !, - thread_send_message(Id, Term). + recorded('$thread_alias',[Id|Queue],_R), !, + '$message_queue_send'(Id, Term). thread_send_message(Queue, Term) :- - '$do_thread_send_message'(Queue, Term), - fail. -% release pointers -thread_send_message(_, _). - -'$do_thread_send_message'(Queue, Term) :- - recorded('$queue',q(Queue,Mutex,Cond,_,Key),_), !, - '$lock_mutex'(Mutex), - '$db_enqueue_unlocked'(Key, Term), -% write(+Queue:Term),nl, - '$cond_signal'(Cond), - '$unlock_mutex'(Mutex). -'$do_thread_send_message'(Queue, Term) :- - '$do_error'(existence_error(queue,Queue),thread_send_message(Queue,Term)). + '$message_queue_send'(Queue, Term). /** @pred thread_get_message(? _Term_) @@ -1435,24 +1347,12 @@ to check whether a thread has swallowed a message sent to it. thread_get_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_get_message(Queue,Term)). thread_get_message(Queue, Term) :- - recorded('$thread_alias',[Id|Queue],_), !, - thread_get_message(Id, Term). + recorded('$thread_alias',[Id|Queue],_R), !, + '$message_queue_receive'(Id, Term). thread_get_message(Queue, Term) :- - recorded('$queue',q(Queue,Mutex,Cond,_,Key),_), !, - '$lock_mutex'(Mutex), -% write(-Queue:Term),nl, - '$thread_get_message_loop'(Key, Term, Mutex, Cond). -thread_get_message(Queue, Term) :- - '$do_error'(existence_error(message_queue,Queue),thread_get_message(Queue,Term)). + '$message_queue_receive'(Queue, Term). -'$thread_get_message_loop'(Key, Term, Mutex, _) :- - '$db_dequeue_unlocked'(Key, Term), !, - '$unlock_mutex'(Mutex). -'$thread_get_message_loop'(Key, Term, Mutex, Cond) :- - '$cond_wait'(Cond, Mutex), - '$thread_get_message_loop'(Key, Term, Mutex, Cond). - /** @pred thread_peek_message(? _Term_) @@ -1514,22 +1414,10 @@ work(Id, Goal) :- thread_peek_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_peek_message(Queue,Term)). thread_peek_message(Queue, Term) :- - recorded('$thread_alias',[Id|Queue],_), !, - thread_peek_message(Id, Term). -thread_peek_message(Queue, Term) :- - recorded('$queue',q(Queue,Mutex,_,_,Key),_), !, - '$lock_mutex'(Mutex), - '$thread_peek_message2'(Key, Term, Mutex). -thread_peek_message(Queue, Term) :- - '$do_error'(existence_error(message_queue,Queue),thread_peek_message(Queue,Term)). - - -'$thread_peek_message2'(Key, Term, Mutex) :- - '$db_peek_queue'(Key, Term), !, - '$unlock_mutex'(Mutex). -'$thread_peek_message2'(_, _, Mutex) :- - '$unlock_mutex'(Mutex), - fail. + recorded('$thread_alias',[Id|Queue],_R), !, + '$message_peek_message'(Id, Term). +tthread_peek_message(Queue, Term) :- + '$message_queue_peek'(Queue, Term). %% @}