diff --git a/C/dbase.c b/C/dbase.c index a485b0bda..aa5dcb9bb 100644 --- a/C/dbase.c +++ b/C/dbase.c @@ -1837,11 +1837,13 @@ new_lu_db_entry(Term t, PredEntry *pe) #if defined(YAPOR) || defined(THREADS) // INIT_LOCK(cl->ClLock); INIT_CLREF_COUNT(cl); -#endif + ipc->opc = Yap_opcode(_copy_idb_term); +#else if (needs_vars) ipc->opc = Yap_opcode(_copy_idb_term); else ipc->opc = Yap_opcode(_unify_idb_term); +#endif return cl; } @@ -5036,6 +5038,44 @@ p_enqueue(void) return TRUE; } +static Int +p_enqueue_unlocked(void) +{ + Term Father = Deref(ARG1); + Term t; + QueueEntry *x; + db_queue *father_key; + + if (IsVarTerm(Father)) { + Yap_Error(INSTANTIATION_ERROR, Father, "enqueue"); + return FALSE; + } else if (!IsIntegerTerm(Father)) { + Yap_Error(TYPE_ERROR_INTEGER, Father, "enqueue"); + return FALSE; + } else + father_key = (db_queue *)IntegerOfTerm(Father); + while ((x = (QueueEntry *)AllocDBSpace(sizeof(QueueEntry))) == NULL) { + if (!Yap_growheap(FALSE, sizeof(QueueEntry), NULL)) { + Yap_Error(OUT_OF_HEAP_ERROR, TermNil, "in findall"); + return FALSE; + } + } + /* Yap_LUClauseSpace += sizeof(QueueEntry); */ + t = Deref(ARG1); + x->DBT = StoreTermInDB(Deref(ARG2), 2); + if (x->DBT == NULL) { + return FALSE; + } + x->next = NULL; + if (father_key->LastInQueue != NULL) + father_key->LastInQueue->next = x; + father_key->LastInQueue = x; + if (father_key->FirstInQueue == NULL) { + father_key->FirstInQueue = x; + } + return TRUE; +} + /* when reading an entry in the data base we are making it accessible from the outside. If the entry was removed, and this was the last pointer, the target entry would be immediately removed, leading to dangling pointers. @@ -5120,6 +5160,108 @@ p_dequeue(void) } } + +static Int +p_dequeue_unlocked(void) +{ + db_queue *father_key; + QueueEntry *cur_instance, *prev_instance; + Term Father = Deref(ARG1); + + if (IsVarTerm(Father)) { + Yap_Error(INSTANTIATION_ERROR, Father, "dequeue"); + return FALSE; + } else if (!IsIntegerTerm(Father)) { + Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue"); + return FALSE; + } else + father_key = (db_queue *)IntegerOfTerm(Father); + prev_instance = NULL; + cur_instance = father_key->FirstInQueue; + while (cur_instance) { + Term TDB; + while ((TDB = GetDBTerm(cur_instance->DBT)) == 0L) { + if (Yap_Error_TYPE == OUT_OF_ATTVARS_ERROR) { + Yap_Error_TYPE = YAP_NO_ERROR; + if (!Yap_growglobal(NULL)) { + Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, Yap_ErrorMessage); + return FALSE; + } + } else { + Yap_Error_TYPE = YAP_NO_ERROR; + if (!Yap_gcl(Yap_Error_Size, 2, YENV, P)) { + Yap_Error(OUT_OF_STACK_ERROR, TermNil, Yap_ErrorMessage); + return FALSE; + } + } + } + if (Yap_unify(ARG2, TDB)) { + if (prev_instance) { + prev_instance->next = cur_instance->next; + if (father_key->LastInQueue == cur_instance) + father_key->LastInQueue = prev_instance; + } else if (cur_instance == father_key->LastInQueue) + father_key->FirstInQueue = father_key->LastInQueue = NULL; + else + father_key->FirstInQueue = cur_instance->next; + /* release space for cur_instance */ + keepdbrefs(cur_instance->DBT); + ErasePendingRefs(cur_instance->DBT); + FreeDBSpace((char *) cur_instance->DBT); + FreeDBSpace((char *) cur_instance); + return TRUE; + } else { + prev_instance = cur_instance; + cur_instance = cur_instance->next; + } + } + /* an empty queue automatically goes away */ + return FALSE; +} + +static Int +p_peek_queue(void) +{ + db_queue *father_key; + QueueEntry *cur_instance; + Term Father = Deref(ARG1); + + if (IsVarTerm(Father)) { + Yap_Error(INSTANTIATION_ERROR, Father, "dequeue"); + return FALSE; + } else if (!IsIntegerTerm(Father)) { + Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue"); + return FALSE; + } else + father_key = (db_queue *)IntegerOfTerm(Father); + cur_instance = father_key->FirstInQueue; + while (cur_instance) { + Term TDB; + while ((TDB = GetDBTerm(cur_instance->DBT)) == 0L) { + if (Yap_Error_TYPE == OUT_OF_ATTVARS_ERROR) { + Yap_Error_TYPE = YAP_NO_ERROR; + if (!Yap_growglobal(NULL)) { + Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, Yap_ErrorMessage); + return FALSE; + } + } else { + Yap_Error_TYPE = YAP_NO_ERROR; + if (!Yap_gcl(Yap_Error_Size, 2, YENV, P)) { + Yap_Error(OUT_OF_STACK_ERROR, TermNil, Yap_ErrorMessage); + return FALSE; + } + } + } + if (Yap_unify(ARG2, TDB)) { + return TRUE; + } + cur_instance = cur_instance->next; + } + return FALSE; +} + + + static Int p_clean_queues(void) { @@ -5205,7 +5347,8 @@ static void ReleaseTermFromDB(DBTerm *ref) { keepdbrefs(ref); - FreeDBSpace((char *)ref); + ErasePendingRefs(ref); + FreeDBSpace((char *) ref); } void @@ -5282,7 +5425,10 @@ Yap_InitDBPreds(void) Yap_InitCPred("$init_db_queue", 1, p_init_queue, SafePredFlag|SyncPredFlag|HiddenPredFlag); Yap_InitCPred("$db_key", 2, p_db_key, HiddenPredFlag); Yap_InitCPred("$db_enqueue", 2, p_enqueue, SyncPredFlag|HiddenPredFlag); + Yap_InitCPred("$db_enqueue_unlocked", 2, p_enqueue_unlocked, SyncPredFlag|HiddenPredFlag); Yap_InitCPred("$db_dequeue", 2, p_dequeue, SyncPredFlag|HiddenPredFlag); + Yap_InitCPred("$db_dequeue_unlocked", 2, p_dequeue_unlocked, SyncPredFlag|HiddenPredFlag); + Yap_InitCPred("$db_peek_queue", 2, p_peek_queue, SyncPredFlag|HiddenPredFlag); Yap_InitCPred("$db_clean_queues", 1, p_clean_queues, SyncPredFlag|HiddenPredFlag); Yap_InitCPred("$switch_log_upd", 1, p_slu, SafePredFlag|SyncPredFlag|HiddenPredFlag); Yap_InitCPred("$log_upd", 1, p_lu, SafePredFlag|SyncPredFlag|HiddenPredFlag); diff --git a/C/threads.c b/C/threads.c index 577be2de5..c83deada0 100644 --- a/C/threads.c +++ b/C/threads.c @@ -136,7 +136,6 @@ kill_thread_engine (int wid, int always_die) static void thread_die(int wid, int always_die) { - if (!always_die) { /* called by thread itself */ ThreadsTotalTime += Yap_cputime(); @@ -200,7 +199,7 @@ thread_run(void *widp) } } } while (t == 0); - free(ThreadHandle[myworker_id].tgoal); + Yap_ReleaseTermFromDB(ThreadHandle[myworker_id].tgoal); ThreadHandle[myworker_id].tgoal = NULL; tgs[1] = ThreadHandle[worker_id].tdetach; tgoal = Yap_MkApplTerm(FunctorThreadRun, 2, tgs); @@ -488,6 +487,7 @@ p_thread_exit(void) { thread_die(worker_id, FALSE); pthread_exit(NULL); + /* done, just make gcc happy */ return TRUE; } diff --git a/H/Heap.h b/H/Heap.h index e70c45e41..ba5c18972 100644 --- a/H/Heap.h +++ b/H/Heap.h @@ -10,7 +10,7 @@ * File: Heap.h * * mods: * * comments: Heap Init Structure * -* version: $Id: Heap.h,v 1.135 2008-08-07 20:51:23 vsc Exp $ * +* version: $Id: Heap.h,v 1.136 2008-08-08 14:05:34 vsc Exp $ * *************************************************************************/ /* information that can be stored in Code Space */ @@ -584,7 +584,7 @@ typedef struct various_codes { } all_heap_codes; #ifdef USE_SYSTEM_MALLOC -struct various_codes *Yap_heap_regs; +extern struct various_codes *Yap_heap_regs; #else #define Yap_heap_regs ((all_heap_codes *)HEAP_INIT_BASE) #endif diff --git a/H/Regs.h b/H/Regs.h index 8327eb35d..ea28f2520 100644 --- a/H/Regs.h +++ b/H/Regs.h @@ -10,7 +10,7 @@ * File: Regs.h * * mods: * * comments: YAP abstract machine registers * -* version: $Id: Regs.h,v 1.40 2008-03-25 22:03:13 vsc Exp $ * +* version: $Id: Regs.h,v 1.41 2008-08-08 14:05:34 vsc Exp $ * *************************************************************************/ @@ -763,7 +763,7 @@ EXTERN inline void restore_B(void) { when we come from a longjmp */ #if PUSH_REGS /* In this case we need to initialise the abstract registers */ -REGSTORE Yap_standard_regs; +extern REGSTORE Yap_standard_regs; #endif /* PUSH_REGS */ /******************* controlling debugging ****************************/ diff --git a/H/clause.h b/H/clause.h index 7c2e8ecd6..15d1baecc 100644 --- a/H/clause.h +++ b/H/clause.h @@ -186,6 +186,7 @@ typedef struct dbterm_list { #define INIT_CLREF_COUNT(X) (X)->ClRefCount = 0 #define INC_CLREF_COUNT(X) (X)->ClRefCount++ #define DEC_CLREF_COUNT(X) (X)->ClRefCount-- + #define CL_IN_USE(X) ((X)->ClRefCount) #else #define INIT_CLREF_COUNT(X) diff --git a/pl/threads.yap b/pl/threads.yap index 6571f504c..c83053bff 100644 --- a/pl/threads.yap +++ b/pl/threads.yap @@ -30,8 +30,6 @@ recorda('$thread_alias', [0|main], _). '$init_thread0' :- recorda('$thread_defaults', [0, 0, 0, false, true], _), - '$new_mutex'(QId), - assert('$global_queue_mutex'(QId)), '$create_thread_mq'(0), '$new_mutex'(Id), assert('$with_mutex_mutex'(Id)). @@ -123,7 +121,8 @@ thread_create(Goal, Id, Options) :- erase(R), fail. '$erase_thread_info'(Id) :- - message_queue_destroy(Id), + recorded('$queue',q(Id,_,_,_,QKey),_), + '$empty_mqueue'(QKey), fail. '$erase_thread_info'(_). @@ -527,10 +526,7 @@ message_queue_create(Id, Options) :- var(Options), !, '$do_error'(instantiation_error, message_queue_create(Id, Options)). message_queue_create(Id, []) :- !, - '$new_mutex'(Mutex), - '$cond_create'(Cond), - '$mq_new_id'(Id, NId, Key), - recorda('$queue',q(Id,Mutex,Cond,NId,Key), _). + '$do_msg_queue_create'(Id). message_queue_create(Id, [alias(Alias)]) :- var(Alias), !, '$do_error'(instantiation_error, message_queue_create(Id, [alias(Alias)])). @@ -560,7 +556,17 @@ message_queue_create(Id) :- ; '$do_error'(type_error(variable, Id), message_queue_create(Id)) ). +'$do_msg_queue_create'(Id) :- + \+ recorded('$queue',q(Id,_,_,_,_), _), + '$new_mutex'(Mutex), + '$cond_create'(Cond), + '$mq_new_id'(Id, NId, Key), + recorda('$queue',q(Id,Mutex,Cond,NId,Key), _), + fail. +'$do_msg_queue_create'(_). + '$create_thread_mq'(TId) :- + \+ recorded('$queue',q(TId,_,_,_,_), _), '$new_mutex'(Mutex), '$cond_create'(Cond), '$mq_new_id'(TId, TId, Key), @@ -572,13 +578,12 @@ message_queue_create(Id) :- '$mq_new_id'(Id, Id, AtId) :- integer(Id), !, \+ recorded('$queue', q(_,_,_,Id,_), _), - atomic_concat('$queue__',Id,AtId), - !. + '$init_db_queue'(AtId). '$mq_new_id'(_, Id, AtId) :- '$integers'(Id), \+ recorded('$queue', q(_,_,_,Id,_), _), - atomic_concat('$queue__',Id,AtId), - !. + !, + '$init_db_queue'(AtId). '$integers'(-1). '$integers'(I) :- @@ -597,10 +602,10 @@ message_queue_destroy(_). '$message_queue_destroy'(Queue) :- recorded('$queue',q(Queue,Mutex,Cond,_,QKey),R), !, - erase(R), + '$clean_mqueue'(QKey), '$cond_destroy'(Cond), '$destroy_mutex'(Mutex), - '$clean_mqueue'(QKey). + erase(R). '$message_queue_destroy'(Queue) :- atomic(Queue), !, '$do_error'(existence_error(message_queue,Queue),message_queue_destroy(Queue)). @@ -608,11 +613,14 @@ message_queue_destroy(_). '$do_error'(type_error(atom,Name),message_queue_destroy(Name)). '$clean_mqueue'(Queue) :- - recorded(Queue,_,R), - erase(R), + '$db_dequeue'(Queue), fail. '$clean_mqueue'(_). +'$empty_mqueue'(Queue) :- + '$db_dequeue_unlocked'(Queue), + fail. +'$empty_mqueue'(_). message_queue_property(Id, Prop) :- ( nonvar(Id) -> @@ -673,7 +681,8 @@ thread_send_message(_, _). '$do_thread_send_message'(Queue, Term) :- recorded('$queue',q(Queue,Mutex,Cond,_,Key),_), !, '$lock_mutex'(Mutex), - recordz(Key,Term,_), + '$db_enqueue_unlocked'(Key, Term), +% write(+Queue:Term),nl, '$cond_signal'(Cond), '$unlock_mutex'(Mutex). '$do_thread_send_message'(Queue, Term) :- @@ -691,14 +700,14 @@ thread_get_message(Queue, Term) :- thread_get_message(Queue, Term) :- recorded('$queue',q(Queue,Mutex,Cond,_,Key),_), !, '$lock_mutex'(Mutex), +% write(-Queue:Term),nl, '$thread_get_message_loop'(Key, Term, Mutex, Cond). thread_get_message(Queue, Term) :- '$do_error'(existence_error(message_queue,Queue),thread_get_message(Queue,Term)). '$thread_get_message_loop'(Key, Term, Mutex, _) :- - recorded(Key,Term,R), !, - erase(R), + '$db_dequeue_unlocked'(Key, Term), !, '$unlock_mutex'(Mutex). '$thread_get_message_loop'(Key, Term, Mutex, Cond) :- '$cond_wait'(Cond, Mutex), @@ -722,7 +731,7 @@ thread_peek_message(Queue, Term) :- '$thread_peek_message2'(Key, Term, Mutex) :- - recorded(Key,Term,_), !, + '$db_peek_queue'(Key, Term), !, '$unlock_mutex'(Mutex). '$thread_peek_message2'(_, _, Mutex) :- '$unlock_mutex'(Mutex),