more thread fixes.

git-svn-id: https://yap.svn.sf.net/svnroot/yap/trunk@2301 b08c6af1-5177-4d33-ba66-4b1c6b8b522a
This commit is contained in:
vsc 2008-08-08 14:05:34 +00:00
parent 2581c3a3bf
commit 5776abb31c
6 changed files with 183 additions and 27 deletions

150
C/dbase.c
View File

@ -1837,11 +1837,13 @@ new_lu_db_entry(Term t, PredEntry *pe)
#if defined(YAPOR) || defined(THREADS)
// INIT_LOCK(cl->ClLock);
INIT_CLREF_COUNT(cl);
#endif
ipc->opc = Yap_opcode(_copy_idb_term);
#else
if (needs_vars)
ipc->opc = Yap_opcode(_copy_idb_term);
else
ipc->opc = Yap_opcode(_unify_idb_term);
#endif
return cl;
}
@ -5036,6 +5038,44 @@ p_enqueue(void)
return TRUE;
}
static Int
p_enqueue_unlocked(void)
{
Term Father = Deref(ARG1);
Term t;
QueueEntry *x;
db_queue *father_key;
if (IsVarTerm(Father)) {
Yap_Error(INSTANTIATION_ERROR, Father, "enqueue");
return FALSE;
} else if (!IsIntegerTerm(Father)) {
Yap_Error(TYPE_ERROR_INTEGER, Father, "enqueue");
return FALSE;
} else
father_key = (db_queue *)IntegerOfTerm(Father);
while ((x = (QueueEntry *)AllocDBSpace(sizeof(QueueEntry))) == NULL) {
if (!Yap_growheap(FALSE, sizeof(QueueEntry), NULL)) {
Yap_Error(OUT_OF_HEAP_ERROR, TermNil, "in findall");
return FALSE;
}
}
/* Yap_LUClauseSpace += sizeof(QueueEntry); */
t = Deref(ARG1);
x->DBT = StoreTermInDB(Deref(ARG2), 2);
if (x->DBT == NULL) {
return FALSE;
}
x->next = NULL;
if (father_key->LastInQueue != NULL)
father_key->LastInQueue->next = x;
father_key->LastInQueue = x;
if (father_key->FirstInQueue == NULL) {
father_key->FirstInQueue = x;
}
return TRUE;
}
/* when reading an entry in the data base we are making it accessible from
the outside. If the entry was removed, and this was the last pointer, the
target entry would be immediately removed, leading to dangling pointers.
@ -5120,6 +5160,108 @@ p_dequeue(void)
}
}
static Int
p_dequeue_unlocked(void)
{
db_queue *father_key;
QueueEntry *cur_instance, *prev_instance;
Term Father = Deref(ARG1);
if (IsVarTerm(Father)) {
Yap_Error(INSTANTIATION_ERROR, Father, "dequeue");
return FALSE;
} else if (!IsIntegerTerm(Father)) {
Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue");
return FALSE;
} else
father_key = (db_queue *)IntegerOfTerm(Father);
prev_instance = NULL;
cur_instance = father_key->FirstInQueue;
while (cur_instance) {
Term TDB;
while ((TDB = GetDBTerm(cur_instance->DBT)) == 0L) {
if (Yap_Error_TYPE == OUT_OF_ATTVARS_ERROR) {
Yap_Error_TYPE = YAP_NO_ERROR;
if (!Yap_growglobal(NULL)) {
Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, Yap_ErrorMessage);
return FALSE;
}
} else {
Yap_Error_TYPE = YAP_NO_ERROR;
if (!Yap_gcl(Yap_Error_Size, 2, YENV, P)) {
Yap_Error(OUT_OF_STACK_ERROR, TermNil, Yap_ErrorMessage);
return FALSE;
}
}
}
if (Yap_unify(ARG2, TDB)) {
if (prev_instance) {
prev_instance->next = cur_instance->next;
if (father_key->LastInQueue == cur_instance)
father_key->LastInQueue = prev_instance;
} else if (cur_instance == father_key->LastInQueue)
father_key->FirstInQueue = father_key->LastInQueue = NULL;
else
father_key->FirstInQueue = cur_instance->next;
/* release space for cur_instance */
keepdbrefs(cur_instance->DBT);
ErasePendingRefs(cur_instance->DBT);
FreeDBSpace((char *) cur_instance->DBT);
FreeDBSpace((char *) cur_instance);
return TRUE;
} else {
prev_instance = cur_instance;
cur_instance = cur_instance->next;
}
}
/* an empty queue automatically goes away */
return FALSE;
}
static Int
p_peek_queue(void)
{
db_queue *father_key;
QueueEntry *cur_instance;
Term Father = Deref(ARG1);
if (IsVarTerm(Father)) {
Yap_Error(INSTANTIATION_ERROR, Father, "dequeue");
return FALSE;
} else if (!IsIntegerTerm(Father)) {
Yap_Error(TYPE_ERROR_INTEGER, Father, "dequeue");
return FALSE;
} else
father_key = (db_queue *)IntegerOfTerm(Father);
cur_instance = father_key->FirstInQueue;
while (cur_instance) {
Term TDB;
while ((TDB = GetDBTerm(cur_instance->DBT)) == 0L) {
if (Yap_Error_TYPE == OUT_OF_ATTVARS_ERROR) {
Yap_Error_TYPE = YAP_NO_ERROR;
if (!Yap_growglobal(NULL)) {
Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, Yap_ErrorMessage);
return FALSE;
}
} else {
Yap_Error_TYPE = YAP_NO_ERROR;
if (!Yap_gcl(Yap_Error_Size, 2, YENV, P)) {
Yap_Error(OUT_OF_STACK_ERROR, TermNil, Yap_ErrorMessage);
return FALSE;
}
}
}
if (Yap_unify(ARG2, TDB)) {
return TRUE;
}
cur_instance = cur_instance->next;
}
return FALSE;
}
static Int
p_clean_queues(void)
{
@ -5205,7 +5347,8 @@ static void
ReleaseTermFromDB(DBTerm *ref)
{
keepdbrefs(ref);
FreeDBSpace((char *)ref);
ErasePendingRefs(ref);
FreeDBSpace((char *) ref);
}
void
@ -5282,7 +5425,10 @@ Yap_InitDBPreds(void)
Yap_InitCPred("$init_db_queue", 1, p_init_queue, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$db_key", 2, p_db_key, HiddenPredFlag);
Yap_InitCPred("$db_enqueue", 2, p_enqueue, SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$db_enqueue_unlocked", 2, p_enqueue_unlocked, SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$db_dequeue", 2, p_dequeue, SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$db_dequeue_unlocked", 2, p_dequeue_unlocked, SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$db_peek_queue", 2, p_peek_queue, SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$db_clean_queues", 1, p_clean_queues, SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$switch_log_upd", 1, p_slu, SafePredFlag|SyncPredFlag|HiddenPredFlag);
Yap_InitCPred("$log_upd", 1, p_lu, SafePredFlag|SyncPredFlag|HiddenPredFlag);

View File

@ -136,7 +136,6 @@ kill_thread_engine (int wid, int always_die)
static void
thread_die(int wid, int always_die)
{
if (!always_die) {
/* called by thread itself */
ThreadsTotalTime += Yap_cputime();
@ -200,7 +199,7 @@ thread_run(void *widp)
}
}
} while (t == 0);
free(ThreadHandle[myworker_id].tgoal);
Yap_ReleaseTermFromDB(ThreadHandle[myworker_id].tgoal);
ThreadHandle[myworker_id].tgoal = NULL;
tgs[1] = ThreadHandle[worker_id].tdetach;
tgoal = Yap_MkApplTerm(FunctorThreadRun, 2, tgs);
@ -488,6 +487,7 @@ p_thread_exit(void)
{
thread_die(worker_id, FALSE);
pthread_exit(NULL);
/* done, just make gcc happy */
return TRUE;
}

View File

@ -10,7 +10,7 @@
* File: Heap.h *
* mods: *
* comments: Heap Init Structure *
* version: $Id: Heap.h,v 1.135 2008-08-07 20:51:23 vsc Exp $ *
* version: $Id: Heap.h,v 1.136 2008-08-08 14:05:34 vsc Exp $ *
*************************************************************************/
/* information that can be stored in Code Space */
@ -584,7 +584,7 @@ typedef struct various_codes {
} all_heap_codes;
#ifdef USE_SYSTEM_MALLOC
struct various_codes *Yap_heap_regs;
extern struct various_codes *Yap_heap_regs;
#else
#define Yap_heap_regs ((all_heap_codes *)HEAP_INIT_BASE)
#endif

View File

@ -10,7 +10,7 @@
* File: Regs.h *
* mods: *
* comments: YAP abstract machine registers *
* version: $Id: Regs.h,v 1.40 2008-03-25 22:03:13 vsc Exp $ *
* version: $Id: Regs.h,v 1.41 2008-08-08 14:05:34 vsc Exp $ *
*************************************************************************/
@ -763,7 +763,7 @@ EXTERN inline void restore_B(void) {
when we come from a longjmp */
#if PUSH_REGS
/* In this case we need to initialise the abstract registers */
REGSTORE Yap_standard_regs;
extern REGSTORE Yap_standard_regs;
#endif /* PUSH_REGS */
/******************* controlling debugging ****************************/

View File

@ -186,6 +186,7 @@ typedef struct dbterm_list {
#define INIT_CLREF_COUNT(X) (X)->ClRefCount = 0
#define INC_CLREF_COUNT(X) (X)->ClRefCount++
#define DEC_CLREF_COUNT(X) (X)->ClRefCount--
#define CL_IN_USE(X) ((X)->ClRefCount)
#else
#define INIT_CLREF_COUNT(X)

View File

@ -30,8 +30,6 @@
recorda('$thread_alias', [0|main], _).
'$init_thread0' :-
recorda('$thread_defaults', [0, 0, 0, false, true], _),
'$new_mutex'(QId),
assert('$global_queue_mutex'(QId)),
'$create_thread_mq'(0),
'$new_mutex'(Id),
assert('$with_mutex_mutex'(Id)).
@ -123,7 +121,8 @@ thread_create(Goal, Id, Options) :-
erase(R),
fail.
'$erase_thread_info'(Id) :-
message_queue_destroy(Id),
recorded('$queue',q(Id,_,_,_,QKey),_),
'$empty_mqueue'(QKey),
fail.
'$erase_thread_info'(_).
@ -527,10 +526,7 @@ message_queue_create(Id, Options) :-
var(Options), !,
'$do_error'(instantiation_error, message_queue_create(Id, Options)).
message_queue_create(Id, []) :- !,
'$new_mutex'(Mutex),
'$cond_create'(Cond),
'$mq_new_id'(Id, NId, Key),
recorda('$queue',q(Id,Mutex,Cond,NId,Key), _).
'$do_msg_queue_create'(Id).
message_queue_create(Id, [alias(Alias)]) :-
var(Alias), !,
'$do_error'(instantiation_error, message_queue_create(Id, [alias(Alias)])).
@ -560,7 +556,17 @@ message_queue_create(Id) :-
; '$do_error'(type_error(variable, Id), message_queue_create(Id))
).
'$do_msg_queue_create'(Id) :-
\+ recorded('$queue',q(Id,_,_,_,_), _),
'$new_mutex'(Mutex),
'$cond_create'(Cond),
'$mq_new_id'(Id, NId, Key),
recorda('$queue',q(Id,Mutex,Cond,NId,Key), _),
fail.
'$do_msg_queue_create'(_).
'$create_thread_mq'(TId) :-
\+ recorded('$queue',q(TId,_,_,_,_), _),
'$new_mutex'(Mutex),
'$cond_create'(Cond),
'$mq_new_id'(TId, TId, Key),
@ -572,13 +578,12 @@ message_queue_create(Id) :-
'$mq_new_id'(Id, Id, AtId) :-
integer(Id), !,
\+ recorded('$queue', q(_,_,_,Id,_), _),
atomic_concat('$queue__',Id,AtId),
!.
'$init_db_queue'(AtId).
'$mq_new_id'(_, Id, AtId) :-
'$integers'(Id),
\+ recorded('$queue', q(_,_,_,Id,_), _),
atomic_concat('$queue__',Id,AtId),
!.
!,
'$init_db_queue'(AtId).
'$integers'(-1).
'$integers'(I) :-
@ -597,10 +602,10 @@ message_queue_destroy(_).
'$message_queue_destroy'(Queue) :-
recorded('$queue',q(Queue,Mutex,Cond,_,QKey),R), !,
erase(R),
'$clean_mqueue'(QKey),
'$cond_destroy'(Cond),
'$destroy_mutex'(Mutex),
'$clean_mqueue'(QKey).
erase(R).
'$message_queue_destroy'(Queue) :-
atomic(Queue), !,
'$do_error'(existence_error(message_queue,Queue),message_queue_destroy(Queue)).
@ -608,11 +613,14 @@ message_queue_destroy(_).
'$do_error'(type_error(atom,Name),message_queue_destroy(Name)).
'$clean_mqueue'(Queue) :-
recorded(Queue,_,R),
erase(R),
'$db_dequeue'(Queue),
fail.
'$clean_mqueue'(_).
'$empty_mqueue'(Queue) :-
'$db_dequeue_unlocked'(Queue),
fail.
'$empty_mqueue'(_).
message_queue_property(Id, Prop) :-
( nonvar(Id) ->
@ -673,7 +681,8 @@ thread_send_message(_, _).
'$do_thread_send_message'(Queue, Term) :-
recorded('$queue',q(Queue,Mutex,Cond,_,Key),_), !,
'$lock_mutex'(Mutex),
recordz(Key,Term,_),
'$db_enqueue_unlocked'(Key, Term),
% write(+Queue:Term),nl,
'$cond_signal'(Cond),
'$unlock_mutex'(Mutex).
'$do_thread_send_message'(Queue, Term) :-
@ -691,14 +700,14 @@ thread_get_message(Queue, Term) :-
thread_get_message(Queue, Term) :-
recorded('$queue',q(Queue,Mutex,Cond,_,Key),_), !,
'$lock_mutex'(Mutex),
% write(-Queue:Term),nl,
'$thread_get_message_loop'(Key, Term, Mutex, Cond).
thread_get_message(Queue, Term) :-
'$do_error'(existence_error(message_queue,Queue),thread_get_message(Queue,Term)).
'$thread_get_message_loop'(Key, Term, Mutex, _) :-
recorded(Key,Term,R), !,
erase(R),
'$db_dequeue_unlocked'(Key, Term), !,
'$unlock_mutex'(Mutex).
'$thread_get_message_loop'(Key, Term, Mutex, Cond) :-
'$cond_wait'(Cond, Mutex),
@ -722,7 +731,7 @@ thread_peek_message(Queue, Term) :-
'$thread_peek_message2'(Key, Term, Mutex) :-
recorded(Key,Term,_), !,
'$db_peek_queue'(Key, Term), !,
'$unlock_mutex'(Mutex).
'$thread_peek_message2'(_, _, Mutex) :-
'$unlock_mutex'(Mutex),