make thread code more stable

git-svn-id: https://yap.svn.sf.net/svnroot/yap/trunk@2063 b08c6af1-5177-4d33-ba66-4b1c6b8b522a
This commit is contained in:
vsc 2008-01-27 11:01:07 +00:00
parent 4ae454aac6
commit ff8213e506
9 changed files with 149 additions and 61 deletions

View File

@ -10,8 +10,12 @@
* *
* File: absmi.c *
* comments: Portable abstract machine interpreter *
* Last rev: $Date: 2008-01-23 17:57:44 $,$Author: vsc $ *
* Last rev: $Date: 2008-01-27 11:01:06 $,$Author: vsc $ *
* $Log: not supported by cvs2svn $
* Revision 1.233 2008/01/23 17:57:44 vsc
* valgrind it!
* enable atom garbage collection.
*
* Revision 1.232 2007/11/28 23:52:14 vsc
* junction tree algorithm
*
@ -2306,9 +2310,12 @@ Yap_absmi(int inp)
}
} else if ((*pt & (LogUpdMask|IndexMask)) == (LogUpdMask|IndexMask)) {
LogUpdIndex *cl = ClauseFlagsToLogUpdIndex(pt);
#if defined(YAPOR) || defined(THREADS)
PredEntry *ap = cl->ClPred;
#endif
int erase;
LOCK(cl->ClPred->PELock);
LOCK(ap->PELock);
DEC_CLREF_COUNT(cl);
cl->ClFlags &= ~InUseMask;
erase = (cl->ClFlags & (DirtyMask|ErasedMask)) && !(cl->ClRefCount);
@ -2322,7 +2329,7 @@ Yap_absmi(int inp)
Yap_CleanUpIndex(cl);
setregs();
}
UNLOCK(cl->ClPred->PELock);
UNLOCK(ap->PELock);
} else {
TrailTerm(pt0) = d1;
pt0++;

View File

@ -1371,7 +1371,7 @@ CreateDBStruct(Term Tm, DBProp p, int InFlag, int *pstat, UInt extra_size, struc
/* place DBRefs in ConsultStack */
DBRef *TmpRefBase = (DBRef *)Yap_TrailTop;
CELL *CodeAbs; /* how much code did we find */
int vars_found;
int vars_found = FALSE;
Yap_Error_TYPE = YAP_NO_ERROR;

View File

@ -913,6 +913,10 @@ InitCodes(void)
#endif /* YAPOR */
#endif /* TABLING */
Yap_heap_regs->expand_op_code = Yap_opcode(_expand_index);
INIT_LOCK(Yap_heap_regs->expand_clauses_list_lock);
#ifdef LOW_LEVEL_TRACER
INIT_LOCK(Yap_heap_regs->low_level_trace_lock);
#endif
Yap_heap_regs->expand_clauses_first = NULL;
Yap_heap_regs->expand_clauses_last = NULL;
Yap_heap_regs->expand_clauses = 0;
@ -953,7 +957,9 @@ InitCodes(void)
int i;
for (i=0; i < MAX_WORKERS; i++) {
Yap_heap_regs->thread_handle[i].in_use = FALSE;
Yap_heap_regs->thread_handle[i].zombie = FALSE;
Yap_heap_regs->thread_handle[i].local_preds = NULL;
pthread_mutex_init(&Yap_heap_regs->thread_handle[i].tlock, NULL);
}
}
Yap_heap_regs->thread_handle[0].id = 0;
@ -980,6 +986,8 @@ InitCodes(void)
{
int i;
for (i=0; i < MAX_WORKERS; i++) {
INIT_LOCK(Yap_heap_regs->wl[i].signal_lock);
Yap_heap_regs->wl[i].active_signals = 0;
Yap_heap_regs->wl[i].scratchpad.ptr = NULL;
Yap_heap_regs->wl[i].scratchpad.sz = SCRATCH_START_SIZE;
Yap_heap_regs->wl[i].scratchpad.msz = SCRATCH_START_SIZE;
@ -1003,6 +1011,7 @@ InitCodes(void)
Yap_heap_regs->wl[i].consultcapacity = InitialConsultCapacity;
Yap_heap_regs->wl[i].consultbase = Yap_heap_regs->wl[i].consultsp =
Yap_heap_regs->wl[i].consultlow + Yap_heap_regs->wl[i].consultcapacity;
Yap_heap_regs->wl[i].Gc_timestamp = 0;
}
}
#else
@ -1285,6 +1294,7 @@ InitCodes(void)
Yap_heap_regs->db_erased_marker =
(DBRef)Yap_AllocCodeSpace(sizeof(DBStruct));
Yap_LUClauseSpace += sizeof(DBStruct);
INIT_LOCK(Yap_heap_regs->dbterms_list_lock);
Yap_heap_regs->dbterms_list = NULL;
Yap_heap_regs->db_erased_marker->id = FunctorDBRef;
Yap_heap_regs->db_erased_marker->Flags = ErasedMask;

View File

@ -5155,8 +5155,12 @@ format(volatile Term otail, volatile Term oargs, int sno)
if (targ > tnum-1 || has_repeats)
goto do_consistency_error;
t = targs[targ++];
if (!LCL0[-30])
fprintf(stderr,"OOPS %d\n",LCL0-ASP);
Yap_StartSlots();
Yap_plwrite (t, f_putc, Handle_vars_f|To_heap_f);
if (!LCL0[-30])
fprintf(stderr,"OOPS %d\n",LCL0-ASP);
FormatInfo = &finfo;
ASP++;
break;
@ -5280,7 +5284,11 @@ format(volatile Term otail, volatile Term oargs, int sno)
if (Stream[sno].status & InMemory_Stream_f) {
Stream[sno].u.mem_string.error_handler = old_handler;
}
if (!LCL0[-30])
fprintf(stderr,"OOPS 3 %d\n",LCL0-ASP);
format_clean_up(finfo.format_base, fstr, targs);
if (!LCL0[-30])
fprintf(stderr,"OOPS 4 %d\n",LCL0-ASP);
return (TRUE);
}

View File

@ -44,10 +44,10 @@ allocate_new_tid(void)
int new_worker_id = 0;
LOCK(ThreadHandlesLock);
while(new_worker_id < MAX_WORKERS &&
ThreadHandle[new_worker_id].in_use == TRUE)
(ThreadHandle[new_worker_id].in_use == TRUE ||
ThreadHandle[new_worker_id].zombie == TRUE) )
new_worker_id++;
ThreadHandle[new_worker_id].in_use = TRUE;
pthread_mutex_init(&ThreadHandle[new_worker_id].tlock, NULL);
pthread_mutex_lock(&(ThreadHandle[new_worker_id].tlock));
UNLOCK(ThreadHandlesLock);
if (new_worker_id == MAX_WORKERS)
@ -92,8 +92,9 @@ kill_thread_engine (int wid)
free(ThreadHandle[wid].default_yaam_regs);
free(ThreadHandle[wid].start_of_timesp);
free(ThreadHandle[wid].last_timep);
ThreadHandle[wid].in_use = FALSE;
pthread_mutex_destroy(&(ThreadHandle[wid].tlock));
pthread_mutex_lock(&(ThreadHandle[wid].tlock));
ThreadHandle[wid].zombie = FALSE;
pthread_mutex_unlock(&(ThreadHandle[wid].tlock));
}
static void
@ -106,8 +107,9 @@ thread_die(int wid, int always_die)
ThreadsTotalTime += Yap_cputime();
}
if (ThreadHandle[wid].tdetach == MkAtomTerm(AtomTrue) ||
always_die)
always_die) {
kill_thread_engine(wid);
}
UNLOCK(ThreadHandlesLock);
}
@ -141,12 +143,31 @@ start_thread(int myworker_id)
static void *
thread_run(void *widp)
{
Term tgoal;
Term tgoal, t;
Term tgs[2];
int myworker_id = *((int *)widp);
start_thread(myworker_id);
tgs[0] = Yap_FetchTermFromDB(ThreadHandle[worker_id].tgoal);
do {
t = tgs[0] = Yap_FetchTermFromDB(ThreadHandle[worker_id].tgoal);
if (t == 0) {
if (Yap_Error_TYPE == OUT_OF_ATTVARS_ERROR) {
Yap_Error_TYPE = YAP_NO_ERROR;
if (!Yap_growglobal(NULL)) {
Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, Yap_ErrorMessage);
thread_die(worker_id, FALSE);
return NULL;
}
} else {
Yap_Error_TYPE = YAP_NO_ERROR;
if (!Yap_growstack(ThreadHandle[worker_id].tgoal->NOfCells*CellSize)) {
Yap_Error(OUT_OF_STACK_ERROR, TermNil, Yap_ErrorMessage);
thread_die(worker_id, FALSE);
return NULL;
}
}
}
} while (t == 0);
tgs[1] = ThreadHandle[worker_id].tdetach;
tgoal = Yap_MkApplTerm(FunctorThreadRun, 2, tgs);
Yap_RunTopGoal(tgoal);
@ -185,7 +206,6 @@ p_create_thread(void)
ThreadHandle[new_worker_id].ref_count = 1;
if ((ThreadHandle[new_worker_id].ret = pthread_create(&ThreadHandle[new_worker_id].handle, NULL, thread_run, (void *)(&(ThreadHandle[new_worker_id].id)))) == 0) {
/* wait until the client is initialised */
pthread_mutex_lock(&(ThreadHandle[new_worker_id].tlock));
pthread_mutex_unlock(&(ThreadHandle[new_worker_id].tlock));
return TRUE;
}
@ -235,6 +255,24 @@ p_thread_self(void)
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
}
static Int
p_thread_zombie_self(void)
{
/* make sure the lock is available */
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
return Yap_unify(MkIntegerTerm(-1), ARG1);
pthread_mutex_lock(&(ThreadHandle[worker_id].tlock));
if (Yap_heap_regs->wl[worker_id].active_signals &= YAP_ITI_SIGNAL) {
pthread_mutex_unlock(&(ThreadHandle[worker_id].tlock));
return FALSE;
}
Yap_heap_regs->thread_handle[worker_id].in_use = FALSE;
Yap_heap_regs->thread_handle[worker_id].zombie = TRUE;
pthread_mutex_unlock(&(ThreadHandle[worker_id].tlock));
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
}
Int
Yap_thread_self(void)
{
@ -291,7 +329,6 @@ Yap_thread_destroy_engine(int wid)
{
pthread_mutex_lock(&(ThreadHandle[wid].tlock));
if (ThreadHandle[wid].ref_count == 0) {
pthread_mutex_unlock(&(ThreadHandle[wid].tlock));
kill_thread_engine(wid);
return TRUE;
} else {
@ -307,7 +344,8 @@ p_thread_join(void)
Int tid = IntegerOfTerm(Deref(ARG1));
LOCK(ThreadHandlesLock);
if (!ThreadHandle[tid].in_use) {
if (!ThreadHandle[tid].in_use &&
!ThreadHandle[tid].zombie) {
UNLOCK(ThreadHandlesLock);
return FALSE;
}
@ -321,6 +359,7 @@ p_thread_join(void)
/* ERROR */
return FALSE;
}
/* notice mutex is already locked */
return TRUE;
}
@ -346,7 +385,8 @@ p_thread_detach(void)
static Int
p_thread_exit(void)
{
thread_die(worker_id, FALSE);
thread_die(worker_id, FALSE);
fprintf(stderr,"2 the end of worker_id=%d\n",worker_id);
pthread_exit(NULL);
return TRUE;
}
@ -385,7 +425,7 @@ static Int
p_valid_thread(void)
{
Int i = IntegerOfTerm(Deref(ARG1));
return ThreadHandle[i].in_use;
return ThreadHandle[i].in_use || ThreadHandle[i].zombie;
}
/* Mutex Support */
@ -593,6 +633,7 @@ void Yap_InitThreadPreds(void)
Yap_InitCPred("$thread_new_tid", 1, p_thread_new_tid, HiddenPredFlag);
Yap_InitCPred("$create_thread", 6, p_create_thread, HiddenPredFlag);
Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag|HiddenPredFlag);
Yap_InitCPred("$thread_zombie_self", 1, p_thread_zombie_self, SafePredFlag|HiddenPredFlag);
Yap_InitCPred("$thread_join", 1, p_thread_join, HiddenPredFlag);
Yap_InitCPred("$thread_destroy", 1, p_thread_destroy, HiddenPredFlag);
Yap_InitCPred("thread_yield", 0, p_thread_yield, 0);

View File

@ -164,19 +164,6 @@ low_level_trace(yap_low_level_port port, PredEntry *pred, CELL *args)
LOCK(Yap_heap_regs->low_level_trace_lock);
sc = Yap_heap_regs;
vsc_count++;
if (LCL0[-30]) {
UNLOCK(Yap_heap_regs->low_level_trace_lock);
old_p[worker_id] = pred;
old_x2[worker_id] = ARG2;
old_x1[worker_id] = ARG1;
old_x3[worker_id] = ARG3;
return;
}
jmp_deb(worker_id);
old_p[worker_id] = pred;
old_x2[worker_id] = ARG2;
old_x1[worker_id] = ARG1;
old_x3[worker_id] = ARG3;
#ifdef COMMENTED
//*(H0+(0xb65f2850-0xb64b2008)/sizeof(CELL))==0xc ||
//0x4fd4d

View File

@ -10,7 +10,7 @@
* File: Heap.h *
* mods: *
* comments: Heap Init Structure *
* version: $Id: Heap.h,v 1.123 2008-01-23 17:57:53 vsc Exp $ *
* version: $Id: Heap.h,v 1.124 2008-01-27 11:01:07 vsc Exp $ *
*************************************************************************/
/* information that can be stored in Code Space */
@ -178,6 +178,7 @@ typedef struct worker_local_struct {
#ifdef THREADS
typedef struct thandle {
int in_use;
int zombie;
UInt ssize;
UInt tsize;
Term tdetach;

View File

@ -17,6 +17,8 @@
<h2>Yap-5.1.3:</h2>
<ul>
<li> FIXED: overflow at entry goal (obs from Paulo Moura).</li>
<li> FIXED: bug with signals while process wasa dying (obs from Paulo Moura).</li>
<li> FIXED: compilation in OSX/64 (patches from Will Benton).</li>
<li> FIXED: do not access erased code (obs from Paulo Moura).</li>
<li> FIXED: clause/3 should not follow fail.</li>

View File

@ -28,11 +28,13 @@
'$init_thread0' :-
no_threads, !.
'$init_thread0' :-
'$create_mq'(0),
'$record_thread_info'(0, main, [0, 0, 0], false, '$init_thread0'),
recorda('$thread_defaults', [0, 0, 0, false], _),
'$new_mutex'(QId),
assert('$global_queue_mutex'(QId)),
'$create_mq'(0),
'$new_mutex'(Id),
recorda('$with_mutex_mutex',Id,_).
assert('$with_mutex_mutex'(Id)).
'$top_thread_goal'(G, Detached) :-
'$thread_self'(Id),
@ -41,23 +43,29 @@
% always finish with a throw to make sure we clean stacks.
'$system_catch'((G -> throw('$thread_finished'(true)) ; throw('$thread_finished'(false))),Module,Exception,'$close_thread'(Exception,Detached)).
'$close_thread'('$thread_finished'(Status), Detached) :- !,
'$thread_self'(Id0),
'$close_thread'(Status, Detached) :-
'$thread_zombie_self'(Id0), !,
'$close_thread'(Status, Detached, Id0).
'$close_thread'(Status, Detached) :- !,
% one self will fail if it had messages
'$close_thread'(Status, Detached).
'$close_thread'('$thread_finished'(Status), Detached, Id0) :- !,
'$run_at_thread_exit'(Id0),
(Detached == true ->
true
;
recorda('$thread_exit_status', [Id0|Status], _)
),
% format(user_error,'closing thread ~w~n',[v([Id0|Status])]).
'$run_at_thread_exit'(Id0).
).
% format(user_error,'closing thread ~w~n',[v([Id0|Status])]).
'$close_thread'(Exception,Detached) :-
'$thread_self'(Id0),
'$run_at_thread_exit'(Id0),
(Detached == true ->
true
;
recorda('$thread_exit_status', [Id0|exception(Exception)], _)
),
'$run_at_thread_exit'(Id0).
).
thread_create(Goal) :-
G0 = thread_create(Goal),
@ -283,14 +291,13 @@ thread_exit(Term) :-
var(Term), !,
'$do_error'(instantiation_error, thread_exit(Term)).
thread_exit(Term) :-
'$thread_self'(Id0),
'$run_at_thread_exit'(Id0),
recorda('$thread_exit_status', [Id0|exited(Term)], _),
'$thread_exit'.
'$close_thread'('$thread_finished'(exited(Term)), Detached).
'$run_at_thread_exit'(Id0) :-
findall(Hook, (recorded('$thread_exit_hook',[Id0|Hook],R), erase(R)), Hooks),
'$run_thread_hooks'(Hooks),
fail.
'$run_at_thread_exit'(Id0) :-
message_queue_destroy(Id0).
'$run_thread_hooks'([]).
@ -453,15 +460,15 @@ with_mutex(M, G) :-
'$do_error'(type_error(callable,G),with_mutex(M, G)).
with_mutex(M, G) :-
atom(M), !,
recorded('$with_mutex_mutex',WMId,_),
'$with_mutex_mutex'(WMId),
'$lock_mutex'(WMId),
( recorded('$mutex_alias',[Id|M],_) ->
true
; '$new_mutex'(Id),
recorda('$mutex_alias',[Id|M],_)
),
'$unlock_mutex'(WMId),
'$lock_mutex'(Id),
'$unlock_mutex'(WMId),
( catch('$execute'(G), E, ('$unlock_mutex'(Id), throw(E))) ->
'$unlock_mutex'(Id)
; '$unlock_mutex'(Id),
@ -532,10 +539,7 @@ message_queue_create(_, [alias(Alias)]) :- % TEMPORARY FIX
message_queue_create(Cond) :-
var(Cond), !,
mutex_create(Mutex),
'$cond_create'(Cond),
'$mq_iname'(Cond, CName),
recorda('$queue',q(Cond,Mutex,Cond,CName), _).
'$create_mq'(Cond).
message_queue_create(Name) :-
atom(Name),
recorded('$thread_alias',[_,Name],_), !,
@ -547,10 +551,18 @@ message_queue_create(Name) :-
'$do_error'(type_error(atom,Name),message_queue_create(Name)).
'$create_mq'(Name) :-
mutex_create(Mutex),
'$new_mutex'(Mutex),
'$cond_create'(Cond),
'$mq_iname'(Name, CName),
recorda('$queue',q(Name,Mutex,Cond, CName),_).
'$global_queue_mutex'(QMutex),
'$lock_mutex'(QMutex),
( recorded('$queue',q(Name,_,_, _),_) ->
'$unlock_mutex'(QMutex),
'$do_error'(permission_error(create,message_queue,Name),message_queue_create(Name))
;
recorda('$queue',q(Name,Mutex,Cond, CName),_),
'$unlock_mutex'(QMutex)
).
'$mq_iname'(I,X) :-
integer(I), !,
@ -564,13 +576,18 @@ message_queue_destroy(Name) :-
var(Name), !,
'$do_error'(instantiation_error,message_queue_destroy(Name)).
message_queue_destroy(Queue) :-
'$global_queue_mutex'(QMutex),
'$lock_mutex'(QMutex),
recorded('$queue',q(Queue,Mutex,Cond,CName),R), !,
erase(R),
'$cond_destroy'(Cond),
mutex_destroy(Mutex),
'$destroy_mutex'(Mutex),
'$unlock_mutex'(QMutex),
'$clean_mqueue'(CName).
message_queue_destroy(Queue) :-
atom(Queue), !,
'$global_queue_mutex'(QMutex),
'$unlock_mutex'(QMutex),
atomic(Queue), !,
'$do_error'(existence_error(message_queue,Queue),message_queue_destroy(Queue)).
message_queue_destroy(Name) :-
'$do_error'(type_error(atom,Name),message_queue_destroy(Name)).
@ -591,12 +608,17 @@ thread_send_message(Queue, Term) :-
recorded('$thread_alias',[Id|Queue],_), !,
thread_send_message(Id, Term).
thread_send_message(Queue, Term) :-
'$global_queue_mutex'(QMutex),
'$lock_mutex'(QMutex),
recorded('$queue',q(Queue,Mutex,Cond,Key),_), !,
mutex_lock(Mutex),
'$lock_mutex'(Mutex),
'$unlock_mutex'(QMutex),
recordz(Key,Term,_),
'$cond_broadcast'(Cond),
mutex_unlock(Mutex).
'$unlock_mutex'(Mutex).
thread_send_message(Queue, Term) :-
'$global_queue_mutex'(QMutex),
'$unlock_mutex'(QMutex),
'$do_error'(existence_error(message_queue,Queue),thread_send_message(Queue,Term)).
thread_get_message(Term) :-
@ -609,17 +631,22 @@ thread_get_message(Queue, Term) :-
recorded('$thread_alias',[Id|Queue],_), !,
thread_get_message(Id, Term).
thread_get_message(Queue, Term) :-
'$global_queue_mutex'(QMutex),
'$lock_mutex'(QMutex),
recorded('$queue',q(Queue,Mutex,Cond,Key),_), !,
mutex_lock(Mutex),
'$lock_mutex'(Mutex),
'$unlock_mutex'(QMutex),
'$thread_get_message_loop'(Key, Term, Mutex, Cond).
thread_get_message(Queue, Term) :-
'$global_queue_mutex'(QMutex),
'$unlock_mutex'(QMutex),
'$do_error'(existence_error(message_queue,Queue),thread_get_message(Queue,Term)).
'$thread_get_message_loop'(Key, Term, Mutex, _) :-
recorded(Key,Term,R), !,
erase(R),
mutex_unlock(Mutex).
'$unlock_mutex'(Mutex).
'$thread_get_message_loop'(Key, Term, Mutex, Cond) :-
'$cond_wait'(Cond, Mutex),
'$thread_get_message_loop'(Key, Term, Mutex, Cond).
@ -634,18 +661,23 @@ thread_peek_message(Queue, Term) :-
recorded('$thread_alias',[Id|Queue],_), !,
thread_peek_message(Id, Term).
thread_peek_message(Queue, Term) :-
'$global_queue_mutex'(QMutex),
'$lock_mutex'(QMutex),
recorded('$queue',q(Queue,Mutex,_,Key),_), !,
mutex_lock(Mutex),
'$lock_mutex'(Mutex),
'$unlock_mutex'(QMutex),
'$thread_peek_message2'(Key, Term, Mutex).
thread_peek_message(Queue, Term) :-
'$global_queue_mutex'(QMutex),
'$unlock_mutex'(QMutex),
'$do_error'(existence_error(message_queue,Queue),thread_peek_message(Queue,Term)).
'$thread_peek_message2'(Key, Term, Mutex) :-
recorded(Key,Term,_), !,
mutex_unlock(Mutex).
'$unlock_mutex'(Mutex).
'$thread_peek_message2'(_, _, Mutex) :-
mutex_unlock(Mutex),
'$unlock_mutex'(Mutex),
fail.
thread_local(X) :-