improve threads support

git-svn-id: https://yap.svn.sf.net/svnroot/yap/trunk@976 b08c6af1-5177-4d33-ba66-4b1c6b8b522a
This commit is contained in:
vsc 2004-02-11 01:20:56 +00:00
parent 31b6bedf67
commit acbf57f59d
12 changed files with 227 additions and 103 deletions

View File

@ -518,7 +518,7 @@ Yap_NewThreadPred(PredEntry *ap)
p->ArityOfPE = ap->ArityOfPE;
p->cs.p_code.FirstClause = p->cs.p_code.LastClause = NULL;
p->cs.p_code.NOfClauses = 0;
p->PredFlags = 0L;
p->PredFlags = (ThreadLocalPredFlag|LogUpdatePredFlag);
p->src.OwnerFile = ap->src.OwnerFile;
p->OpcodeOfPred = UNDEF_OPCODE;
p->CodeOfPred = p->cs.p_code.TrueCodeOfPred = (yamop *)(&(p->OpcodeOfPred));
@ -577,6 +577,50 @@ Yap_NewPredPropByAtom(AtomEntry *ae, SMALLUNSGN cur_mod)
return (p0);
}
Prop
Yap_PredPropByFunctorNonThreadLocal(Functor f, SMALLUNSGN cur_mod)
/* get predicate entry for ap/arity; create it if neccessary. */
{
Prop p0;
FunctorEntry *fe = (FunctorEntry *)f;
WRITE_LOCK(fe->FRWLock);
p0 = fe->PropsOfFE;
while (p0) {
PredEntry *p = RepPredProp(p0);
if (/* p->KindOfPE != 0 || only props */
(p->ModuleOfPred == cur_mod || !(p->ModuleOfPred))) {
WRITE_UNLOCK(fe->FRWLock);
return (p0);
}
p0 = p->NextOfPE;
}
return Yap_NewPredPropByFunctor(fe,cur_mod);
}
Prop
Yap_PredPropByAtomNonThreadLocal(Atom at, SMALLUNSGN cur_mod)
/* get predicate entry for ap/arity; create it if neccessary. */
{
Prop p0;
AtomEntry *ae = RepAtom(at);
WRITE_LOCK(ae->ARWLock);
p0 = ae->PropsOfAE;
while (p0) {
PredEntry *pe = RepPredProp(p0);
if ( pe->KindOfPE == PEProp &&
(pe->ModuleOfPred == cur_mod || !pe->ModuleOfPred)) {
WRITE_UNLOCK(ae->ARWLock);
return(p0);
}
p0 = pe->NextOfPE;
}
return Yap_NewPredPropByAtom(ae,cur_mod);
}
Term
Yap_GetValue(Atom a)
{

View File

@ -12,7 +12,7 @@
* Last rev: *
* mods: *
* comments: allocating space *
* version:$Id: alloc.c,v 1.44 2004-02-05 16:56:58 vsc Exp $ *
* version:$Id: alloc.c,v 1.45 2004-02-11 01:20:56 vsc Exp $ *
*************************************************************************/
#ifdef SCCS
static char SccsId[] = "%W% %G%";
@ -163,6 +163,8 @@ InitExStacks(int Trail, int Stack)
Yap_LocalBase = Yap_GlobalBase + sa;
Yap_TrailBase = Yap_LocalBase + sizeof(CELL);
ScratchPad.ptr = NULL;
ScratchPad.sz = ScratchPad.msz = SCRATCH_START_SIZE;
AuxSp = NULL;
#ifdef DEBUG

View File

@ -1525,37 +1525,12 @@ p_endconsult(void)
return (TRUE);
}
static Int
p_purge_clauses(void)
{ /* '$purge_clauses'(+Func) */
PredEntry *pred;
Term t = Deref(ARG1);
Term t2 = Deref(ARG2);
static void
purge_clauses(PredEntry *pred)
{
yamop *q;
SMALLUNSGN mod;
int in_use;
Yap_PutValue(AtomAbol, MkAtomTerm(AtomNil));
if (IsVarTerm(t))
return (FALSE);
if (IsVarTerm(t2) || !IsAtomTerm(t2)) {
return (FALSE);
}
mod = Yap_LookupModule(t2);
if (IsAtomTerm(t)) {
Atom at = AtomOfTerm(t);
pred = RepPredProp(PredPropByAtom(at, mod));
} else if (IsApplTerm(t)) {
Functor fun = FunctorOfTerm(t);
pred = RepPredProp(PredPropByFunc(fun, mod));
} else
return (FALSE);
WRITE_LOCK(pred->PRWLock);
if (pred->PredFlags & StandardPredFlag) {
WRITE_UNLOCK(pred->PRWLock);
Yap_Error(PERMISSION_ERROR_MODIFY_STATIC_PROCEDURE, t, "assert/1");
return (FALSE);
}
if (pred->PredFlags & IndexedPredFlag)
RemoveIndexation(pred);
Yap_PutValue(AtomAbol, MkAtomTerm(AtomTrue));
@ -1598,6 +1573,44 @@ p_purge_clauses(void)
pred->src.OwnerFile = AtomNil;
if (pred->PredFlags & MultiFileFlag)
pred->PredFlags ^= MultiFileFlag;
}
void
Yap_Abolish(PredEntry *pred)
{
purge_clauses(pred);
}
static Int
p_purge_clauses(void)
{ /* '$purge_clauses'(+Func) */
PredEntry *pred;
Term t = Deref(ARG1);
Term t2 = Deref(ARG2);
SMALLUNSGN mod;
Yap_PutValue(AtomAbol, MkAtomTerm(AtomNil));
if (IsVarTerm(t))
return (FALSE);
if (IsVarTerm(t2) || !IsAtomTerm(t2)) {
return (FALSE);
}
mod = Yap_LookupModule(t2);
if (IsAtomTerm(t)) {
Atom at = AtomOfTerm(t);
pred = RepPredProp(PredPropByAtom(at, mod));
} else if (IsApplTerm(t)) {
Functor fun = FunctorOfTerm(t);
pred = RepPredProp(PredPropByFunc(fun, mod));
} else
return (FALSE);
WRITE_LOCK(pred->PRWLock);
if (pred->PredFlags & StandardPredFlag) {
WRITE_UNLOCK(pred->PRWLock);
Yap_Error(PERMISSION_ERROR_MODIFY_STATIC_PROCEDURE, t, "assert/1");
return (FALSE);
}
purge_clauses(pred);
WRITE_UNLOCK(pred->PRWLock);
return (TRUE);
}

View File

@ -1251,7 +1251,7 @@ c_goal(Term Goal, int mod, compiler_struct *cglobs)
return;
}
#endif /* YAPOR */
p = RepPredProp(p0 = PredPropByAtom(atom, mod));
p = RepPredProp(p0 = Yap_PredPropByAtomNonThreadLocal(atom, mod));
/* if we are profiling, make sure we register we entered this predicate */
if (profiling)
Yap_emit(enter_profiling_op, (CELL)p, Zero, &cglobs->cint);
@ -1260,7 +1260,7 @@ c_goal(Term Goal, int mod, compiler_struct *cglobs)
}
else {
f = FunctorOfTerm(Goal);
p = RepPredProp(p0 = PredPropByFunc(f, mod));
p = RepPredProp(p0 = Yap_PredPropByFunctorNonThreadLocal(f, mod));
if (f == FunctorOr) {
CELL l = ++cglobs->labelno;
CELL m = ++cglobs->labelno;

View File

@ -6556,10 +6556,12 @@ Yap_FollowIndexingCode(PredEntry *ap, yamop *ipc, Term t1, Term tb, Term tr, yam
case _unlock_lu:
ipc = NEXTOP(ipc,e);
break;
#if THREADS
case _thread_local:
break;
#if THREADS
ap = Yap_GetThreadPred(ap);
ipc = ap->CodeOfPred;
#endif
break;
case _index_pred:
case _spy_pred:
Yap_IPred(ap);

View File

@ -962,7 +962,7 @@ InitCodes(void)
heap_regs->functor_stream = Yap_MkFunctor (AtomStream, 1);
heap_regs->functor_stream_pos = Yap_MkFunctor (AtomStreamPos, 3);
heap_regs->functor_stream_eOS = Yap_MkFunctor (Yap_LookupAtom("end_of_stream"), 1);
heap_regs->functor_thread_run = Yap_MkFunctor (Yap_FullLookupAtom("$top_thread_goal"), 1);
heap_regs->functor_thread_run = Yap_MkFunctor (Yap_FullLookupAtom("$top_thread_goal"), 2);
heap_regs->functor_change_module = Yap_MkFunctor (Yap_FullLookupAtom("$change_module"), 1);
heap_regs->functor_current_module = Yap_MkFunctor (Yap_FullLookupAtom("$current_module"), 1);
FunctorThrow = Yap_MkFunctor( Yap_FullLookupAtom("throw"), 1);

View File

@ -52,22 +52,39 @@ allocate_new_tid(void)
}
static void
store_specs(int new_worker_id, UInt ssize, UInt tsize, Term tgoal)
store_specs(int new_worker_id, UInt ssize, UInt tsize, Term tgoal, Term tdetach)
{
ThreadHandle[new_worker_id].ssize = ssize;
ThreadHandle[new_worker_id].tsize = tsize;
ThreadHandle[new_worker_id].tgoal =
Yap_StoreTermInDB(tgoal,4);
ThreadHandle[new_worker_id].cmod =
CurrentModule;
if (IsVarTerm(tdetach))
tdetach = MkAtomTerm(AtomFalse);
ThreadHandle[new_worker_id].tdetach =
tdetach;
}
static void
thread_die(void)
{
Prop p0 = AbsPredProp(heap_regs->thread_handle[worker_id].local_preds);
/* kill all thread local preds */
while(p0) {
PredEntry *ap = RepPredProp(p0);
p0 = ap->NextOfPE;
Yap_Abolish(ap);
Yap_FreeCodeSpace((char *)ap);
}
Yap_KillStacks();
LOCK(ThreadHandlesLock);
ActiveSignals = 0L;
free(ScratchPad.ptr);
free(ThreadHandle[worker_id].default_yaam_regs);
ThreadHandle[worker_id].in_use = FALSE;
free((void *)ThreadHandle[worker_id].default_yaam_regs);
UNLOCK(ThreadHandlesLock);
}
@ -75,7 +92,7 @@ static void *
thread_run(void *widp)
{
Term tgoal;
Term tgs[1];
Term tgs[2];
int out;
REGSTORE *standard_regs = (REGSTORE *)malloc(sizeof(REGSTORE));
int myworker_id = *((int *)widp);
@ -85,17 +102,25 @@ thread_run(void *widp)
pthread_setspecific(Yap_yaamregs_key, (void *)standard_regs);
worker_id = myworker_id;
Yap_InitExStacks(ThreadHandle[myworker_id].ssize, ThreadHandle[myworker_id].tsize);
CurrentModule = ThreadHandle[myworker_id].cmod;
Yap_InitYaamRegs();
{
Yap_ReleasePreAllocCodeSpace(Yap_PreAllocCodeSpace());
}
tgs[0] = Yap_FetchTermFromDB(ThreadHandle[worker_id].tgoal);
tgoal = Yap_MkApplTerm(FunctorThreadRun, 1, tgs);
tgs[1] = ThreadHandle[worker_id].tdetach;
tgoal = Yap_MkApplTerm(FunctorThreadRun, 2, tgs);
out = Yap_RunTopGoal(tgoal);
thread_die();
return NULL;
}
static Int
p_thread_new_tid(void)
{
return Yap_unify(MkIntegerTerm(allocate_new_tid()), ARG1);
}
static Int
p_create_thread(void)
{
@ -103,16 +128,17 @@ p_create_thread(void)
UInt tsize = IntegerOfTerm(Deref(ARG3));
/* UInt systemsize = IntegerOfTerm(Deref(ARG4)); */
Term tgoal = Deref(ARG1);
int new_worker_id = allocate_new_tid();
Term tdetach = Deref(ARG5);
int new_worker_id = IntegerOfTerm(Deref(ARG6));
if (new_worker_id == -1) {
/* YAP ERROR */
return FALSE;
}
ThreadHandle[new_worker_id].id = new_worker_id;
store_specs(new_worker_id, ssize, tsize, tgoal);
if ((ThreadHandle[new_worker_id].ret = pthread_create(&(ThreadHandle[new_worker_id].handle), NULL, thread_run, (void *)(&(ThreadHandle[new_worker_id].id)))) == 0)
return Yap_unify(MkIntegerTerm(new_worker_id), ARG5);
thread_die();
store_specs(new_worker_id, ssize, tsize, tgoal, tdetach);
if ((ThreadHandle[new_worker_id].ret = pthread_create(&(ThreadHandle[new_worker_id].handle), NULL, thread_run, (void *)(&(ThreadHandle[new_worker_id].id)))) == 0) {
return TRUE;
}
/* YAP ERROR */
return FALSE;
}
@ -148,15 +174,9 @@ p_thread_detach(void)
static Int
p_thread_exit(void)
{
pthread_exit(NULL);
return TRUE;
}
static Int
p_thread_die(void)
{
thread_die();
pthread_exit(NULL);
return TRUE;
}
@ -223,7 +243,7 @@ p_new_mutex(void)
static Int
p_destroy_mutex(void)
{
SWIMutex *mut = (SWIMutex*)Deref(ARG1);
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
if (pthread_mutex_destroy(&mut->m) < 0)
return FALSE;
@ -234,7 +254,7 @@ p_destroy_mutex(void)
static Int
p_lock_mutex(void)
{
SWIMutex *mut = (SWIMutex*)Deref(ARG1);
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
if (pthread_mutex_lock(&mut->m) < 0)
return FALSE;
@ -246,7 +266,7 @@ p_lock_mutex(void)
static Int
p_trylock_mutex(void)
{
SWIMutex *mut = (SWIMutex*)Deref(ARG1);
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
if (pthread_mutex_trylock(&mut->m) == EBUSY)
return FALSE;
@ -258,7 +278,7 @@ p_trylock_mutex(void)
static Int
p_unlock_mutex(void)
{
SWIMutex *mut = (SWIMutex*)Deref(ARG1);
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
if (pthread_mutex_unlock(&mut->m) < 0)
return FALSE;
@ -269,7 +289,7 @@ p_unlock_mutex(void)
static Int
p_info_mutex(void)
{
SWIMutex *mut = (SWIMutex*)Deref(ARG1);
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
return Yap_unify(ARG2, MkIntegerTerm(mut->owners)) &&
Yap_unify(ARG2, MkIntegerTerm(mut->tid_own));
@ -292,7 +312,7 @@ p_cond_create(void)
static Int
p_cond_destroy(void)
{
pthread_cond_t *condp = (pthread_cond_t *)Deref(ARG1);
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_destroy(condp) < 0)
return FALSE;
@ -303,7 +323,7 @@ p_cond_destroy(void)
static Int
p_cond_signal(void)
{
pthread_cond_t *condp = (pthread_cond_t *)Deref(ARG1);
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_signal(condp) < 0)
return FALSE;
@ -313,7 +333,7 @@ p_cond_signal(void)
static Int
p_cond_broadcast(void)
{
pthread_cond_t *condp = (pthread_cond_t *)Deref(ARG1);
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_broadcast(condp) < 0)
return FALSE;
@ -323,11 +343,10 @@ p_cond_broadcast(void)
static Int
p_cond_wait(void)
{
pthread_cond_t *condp = (pthread_cond_t *)Deref(ARG1);
SWIMutex *mut = (SWIMutex*)Deref(ARG2);
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2));
if (pthread_cond_wait(condp, &mut->m) < 0)
return FALSE;
pthread_cond_wait(condp, &mut->m);
return TRUE;
}
@ -353,7 +372,7 @@ p_install_thread_local(void)
if (pe->PredFlags & (UserCPredFlag|HiddenPredFlag|CArgsPredFlag|SourcePredFlag|SyncPredFlag|TestPredFlag|AsmPredFlag|StandardPredFlag|DynamicPredFlag|CPredFlag|SafePredFlag|IndexedPredFlag|BinaryTestPredFlag|SpiedPredFlag)) {
return FALSE;
}
pe->PredFlags |= ThreadLocalPredFlag;
pe->PredFlags |= (ThreadLocalPredFlag|LogUpdatePredFlag);
pe->OpcodeOfPred = Yap_opcode(_thread_local);
pe->CodeOfPred = (yamop *)&pe->OpcodeOfPred;
WRITE_UNLOCK(pe->PRWLock);
@ -371,14 +390,21 @@ p_thread_signal(void)
return TRUE;
}
static Int
p_no_threads(void)
{ /* '$thread_signal'(+P) */
return FALSE;
}
void Yap_InitThreadPreds(void)
{
Yap_InitCPred("$create_thread", 5, p_create_thread, 0);
Yap_InitCPred("$no_threads", 0, p_no_threads, 0);
Yap_InitCPred("$thread_new_tid", 1, p_thread_new_tid, 0);
Yap_InitCPred("$create_thread", 6, p_create_thread, 0);
Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag);
Yap_InitCPred("$thread_join", 1, p_thread_join, 0);
Yap_InitCPred("$detach_thread", 1, p_thread_detach, 0);
Yap_InitCPred("$thread_exit", 0, p_thread_exit, 0);
Yap_InitCPred("$thread_die", 0, p_thread_die, 0);
Yap_InitCPred("thread_set_concurrency", 2, p_thread_set_concurrency, 0);
Yap_InitCPred("$valid_thread", 1, p_valid_thread, 0);
Yap_InitCPred("$new_mutex", 1, p_new_mutex, SafePredFlag);
@ -393,7 +419,21 @@ void Yap_InitThreadPreds(void)
Yap_InitCPred("$cond_broadcast", 1, p_cond_broadcast, SafePredFlag);
Yap_InitCPred("$cond_wait", 2, p_cond_wait, SafePredFlag);
Yap_InitCPred("$install_thread_local", 2, p_install_thread_local, SafePredFlag);
Yap_InitCPred("$thread_signal", 2, p_thread_signal, SafePredFlag);
Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag);
}
#else
static Int
p_no_threads(void)
{ /* '$thread_signal'(+P) */
return TRUE;
}
void Yap_InitThreadPreds(void)
{
Yap_InitCPred("$no_threads", 0, p_create_thread, 0);
}

View File

@ -10,7 +10,7 @@
* File: Heap.h *
* mods: *
* comments: Heap Init Structure *
* version: $Id: Heap.h,v 1.53 2004-02-09 14:19:04 vsc Exp $ *
* version: $Id: Heap.h,v 1.54 2004-02-11 01:20:56 vsc Exp $ *
*************************************************************************/
/* information that can be stored in Code Space */
@ -65,6 +65,8 @@ typedef struct thandle {
int in_use;
UInt ssize;
UInt tsize;
Term tdetach;
SMALLUNSGN cmod;
struct DB_TERM *tgoal;
int id;
int ret;

View File

@ -10,7 +10,7 @@
* File: Yap.proto *
* mods: *
* comments: Function declarations for YAP *
* version: $Id: Yapproto.h,v 1.44 2004-02-06 02:26:23 vsc Exp $ *
* version: $Id: Yapproto.h,v 1.45 2004-02-11 01:20:56 vsc Exp $ *
*************************************************************************/
/* prototype file for Yap */
@ -31,6 +31,8 @@ Atom STD_PROTO(Yap_FullLookupAtom,(char *));
void STD_PROTO(Yap_LookupAtomWithAddress,(char *,AtomEntry *));
Prop STD_PROTO(Yap_NewPredPropByFunctor,(struct FunctorEntryStruct *, SMALLUNSGN));
Prop STD_PROTO(Yap_NewPredPropByAtom,(struct AtomEntryStruct *, SMALLUNSGN));
Prop STD_PROTO(Yap_PredPropByFunctorNonThreadLocal,(struct FunctorEntryStruct *, SMALLUNSGN));
Prop STD_PROTO(Yap_PredPropByAtomNonThreadLocal,(struct AtomEntryStruct *, SMALLUNSGN));
Functor STD_PROTO(Yap_UnlockedMkFunctor,(AtomEntry *,unsigned int));
Functor STD_PROTO(Yap_MkFunctor,(Atom,unsigned int));
void STD_PROTO(Yap_MkFunctorWithAddress,(Atom,unsigned int,FunctorEntry *));
@ -112,6 +114,7 @@ int STD_PROTO(where_new_clause, (Prop, int));
#endif
void STD_PROTO(Yap_init_consult,(int, char *));
void STD_PROTO(Yap_end_consult,(void));
void STD_PROTO(Yap_Abolish,(struct pred_entry *));
/* cmppreds.c */

View File

@ -496,7 +496,7 @@ exec_top_level(int BootMode, YAP_init_args *iap)
char init_file[256];
YAP_Atom atfile;
YAP_Functor fgoal;
YAP_Term goal, as[1];
YAP_Term goal, as[2];
#if HAVE_STRNCAT
strncpy(init_file, PL_SRC_DIR, 256);
@ -511,8 +511,9 @@ exec_top_level(int BootMode, YAP_init_args *iap)
/* consult init file */
atfile = YAP_LookupAtom(init_file);
as[0] = YAP_MkAtomTerm(atfile);
fgoal = YAP_MkFunctor(YAP_FullLookupAtom("$consult"), 1);
goal = YAP_MkApplTerm(fgoal, 1, as);
as[1] = YAP_MkAtomTerm(YAP_LookupAtom("prolog"));
fgoal = YAP_MkFunctor(YAP_FullLookupAtom("$consult"), 2);
goal = YAP_MkApplTerm(fgoal, 2, as);
/* launch consult */
YAP_RunGoal(goal);
/* set default module to user */

View File

@ -387,6 +387,9 @@ print_message(Level, Mss) :-
'$output_error_message'(domain_error(syntax_error_handler,What), Where) :-
'$format'(user_error,"[ DOMAIN ERROR- ~w: ~w not a syntax error handler ]~n",
[Where,What]).
'$output_error_message'(domain_error(thread_create_option,Option+Opts), Where) :-
'$format'(user_error,"[ DOMAIN ERROR- ~w: ~w not in ~w ]~n",
[Where,Option, Opts]).
'$output_error_message'(domain_error(time_out_spec,What), Where) :-
'$format'(user_error,"[ DOMAIN ERROR- ~w: ~w not a valid specification for a time out ]~n",
[Where,What]).

View File

@ -20,7 +20,17 @@
thread_at_exit(:),
thread_signal(+,:).
'$top_thread_goal'(G) :-
:- initialization('$init_thread0').
'$init_thread0' :-
no_threads, !.
'$init_thread0' :-
'$create_mq'(0).
'$top_thread_goal'(G, Detached) :-
'$thread_self'(Id),
(Detached == true -> '$detach_thread'(Id) ; true),
'$current_module'(Module),
'$system_catch'((G,'$close_thread'),Module,Exception,'$thread_exception'(Exception)).
@ -43,14 +53,14 @@ thread_create(Goal, Id, Options) :-
G0 = thread_create(Goal, Id, Options),
'$check_callable'(Goal,G0),
'$thread_options'(Options, Aliases, Stack, Trail, System, Detached, G0),
'$create_thread'(Goal, Stack, Trail, System, Id),
'$thread_new_tid'(Id),
'$add_thread_aliases'(Aliases, Id),
'$clean_db_on_id'(Id),
(Detached == true -> '$detach_thread'(Id) ; true),
'$create_mq'(Id),
'$add_thread_aliases'(Aliases, Id).
'$create_mq'(Id),
'$create_thread'(Goal, Stack, Trail, System, Detached, Id).
'$clean_db_on_id'(Id) :-
recorda('$thread_exit_status', [Id|_], R),
recorded('$thread_exit_status', [Id|_], R),
erase(R),
fail.
'$clean_db_on_id'(Id) :-
@ -70,10 +80,12 @@ thread_create(Goal, Id, Options) :-
'$thread_ground_stacks'(Stack),
'$thread_ground_stacks'(Trail),
'$thread_ground_stacks'(System).
'$thread_options'([Opt|OPts], Aliases, Stack, Trail, System, Detached, G0) :-
'$thread_option'(OPt, Aliases, Stack, Trail, System, Detached, G0, Aliases0),
'$thread_options'([Opt|Opts], Aliases, Stack, Trail, System, Detached, G0) :-
'$thread_option'(Opt, Aliases, Stack, Trail, System, Detached, G0, Aliases0),
'$thread_options'(Opts, Aliases0, Stack, Trail, System, Detached, G0).
'$thread_option'(Option, Aliases, _, _, _, _, G0, Aliases) :- var(Option), !,
'$do_error'(instantiation_error,G0).
'$thread_option'(stacks(Stack), Aliases, Stack, _, _, _, G0, Aliases) :- !,
( \+ integer(Stack) -> '$do_error'(type_error(integer,Stack),G0) ; true ).
'$thread_option'(trail(Trail), Aliases, _, Trail, _, _, G0, Aliases) :- !,
@ -85,7 +97,7 @@ thread_create(Goal, Id, Options) :-
'$thread_option'(detached(B), Aliases, _, _, _, B, G0, Aliases) :- !,
( B \== true, B \== false -> '$do_error'(domain_error(flag_value,B+[true,false]),G0) ; true ).
'$thread_option'(Option, Aliases, _, _, _, _, G0, Aliases) :-
'$do_error'(domain_error(thread_option,Option+[stacks(_),trail(_),system(_),alias(_),detached(_)]),G0).
'$do_error'(domain_error(thread_create_option,Option+[stacks(_),trail(_),system(_),alias(_),detached(_)]),G0).
'$thread_ground_stacks'(0) :- !.
'$thread_ground_stacks'(_).
@ -170,8 +182,8 @@ current_thread(Tid, Status) :-
mutex_create(V) :-
var(V), !,
'$new_mutex'(Id),
recorda('$mutex'(Id,Id),_).
'$new_mutex'(V),
recorda('$mutex',[V|V],_).
mutex_create(A) :-
atom(A),
recorded('$mutex',[A|_],_), !,
@ -179,7 +191,7 @@ mutex_create(A) :-
mutex_create(A) :-
atom(A), !,
'$new_mutex'(Id),
recorda('$mutex'(A,Id),_).
recorda('$mutex',[A|Id],_).
mutex_create(V) :-
'$do_error'(type_error(atom,V),mutex_create(V)).
@ -187,8 +199,8 @@ mutex_destroy(V) :-
var(V), !,
'$do_error'(instantiation_error,mutex_destroy(A)).
mutex_destroy(A) :-
recorded('$mutex',[A|Id],R),
'$kill_mutex'(Id),
recorded('$mutex',[A|Id],R), !,
'$destroy_mutex'(Id),
erase(R).
mutex_destroy(A) :-
atom(A), !,
@ -200,7 +212,7 @@ mutex_lock(V) :-
var(V), !,
'$do_error'(instantiation_error,mutex_lock(A)).
mutex_lock(A) :-
recorded('$mutex',[A|Id],_),
recorded('$mutex',[A|Id],_), !,
'$lock_mutex'(Id).
mutex_lock(A) :-
atom(A), !,
@ -213,7 +225,7 @@ mutex_trylock(V) :-
var(V), !,
'$do_error'(instantiation_error,mutex_trylock(A)).
mutex_trylock(A) :-
recorded('$mutex',[A|Id],_),
recorded('$mutex',[A|Id],_), !,
'$trylock_mutex'(Id).
mutex_trylock(A) :-
atom(A), !,
@ -226,7 +238,7 @@ mutex_unlock(V) :-
var(V), !,
'$do_error'(instantiation_error,mutex_unlock(A)).
mutex_unlock(A) :-
recorded('$mutex',[A|Id],_),
recorded('$mutex',[A|Id],_), !,
( '$unlock_mutex'(Id) ->
true
;
@ -263,14 +275,14 @@ message_queue_create(Cond) :-
'$cond_create'(Cond),
recorda('$queue',q(Cond,Mutex,Cond), _).
message_queue_create(Name) :-
atom(Name), !,
recorded('$thread_alias',[Name|_],_),
'$do_error'(permission_error(create,queue,Name),thread_queue_create(Name)).
atom(Name),
recorded('$thread_alias',[Name|_],_), !,
'$do_error'(permission_error(create,queue,Name),message_queue_create(Name)).
message_queue_create(Name) :-
atom(Name), !,
'$create_mq'(Name).
message_queue_create(Name) :-
'$do_error'(type_error(atom,Name),thread_queue_create(Name)).
'$do_error'(type_error(atom,Name),message_queue_create(Name)).
'$create_mq'(Name) :-
mutex_create(Mutex),
@ -280,7 +292,7 @@ message_queue_create(Name) :-
message_queue_destroy(Name) :-
var(Name), !,
'$do_error'(instantiation_error,thread_queue_destroy(Name)).
'$do_error'(instantiation_error,message_queue_destroy(Name)).
message_queue_destroy(Queue) :-
recorded('$queue',q(Queue,Mutex,Cond),R), !,
erase(R),
@ -289,9 +301,9 @@ message_queue_destroy(Queue) :-
'$clean_mqueue'(Queue).
message_queue_destroy(Queue) :-
atom(Queue), !,
'$do_error'(existence_error(queue,Queue),thread_queue_destroy(Name)).
'$do_error'(existence_error(queue,Queue),message_queue_destroy(Name)).
message_queue_destroy(Name) :-
'$do_error'(type_error(atom,Name),thread_queue_destroy(Name)).
'$do_error'(type_error(atom,Name),message_queue_destroy(Name)).
'$clean_mqueue'(Q) :-
recorded('$msg_queue',q(Queue,_),R),
@ -300,7 +312,7 @@ message_queue_destroy(Name) :-
'$clean_mqueue'(_).
thread_send_message(Queue, Term) :-
recorded('$thread_alias',[Queue|Id],_),
recorded('$thread_alias',[Queue|Id],_), !,
thread_send_message(Id, Term).
thread_send_message(Queue, Term) :-
recorded('$queue',q(Queue,Mutex,Cond),_),
@ -320,8 +332,8 @@ thread_get_message(Queue, Term) :-
'$thread_get_message_loop'(Queue, Term, Mutex, Cond) :-
recorded('$msg_queue',q(Queue,Term),R), !,
mutex_unlock(Mutex),
erase(R).
erase(R),
mutex_unlock(Mutex).
'$thread_get_message_loop'(Queue, Term, Mutex, Cond) :-
'$cond_wait'(Cond, Mutex),
'$thread_get_message_loop'(Queue, Term, Mutex, Cond).
@ -335,13 +347,17 @@ thread_peek_message(Queue, Term) :-
mutex_lock(Mutex),
'$thread_peek_message2'(Queue, Term, Mutex).
'$thread_get_message_loop'(Queue, Term, Mutex) :-
'$thread_peek_message2'(Queue, Term, Mutex) :-
recorded('$msg_queue',q(Queue,Term),_), !,
mutex_unlock(Mutex).
'$thread_get_message_loop'(Queue, Term, Mutex) :-
'$thread_peek_message2'(Queue, Term, Mutex) :-
mutex_unlock(Mutex),
fail.
thread_local(X) :-
'$current_module'(M),
'$thread_local'(X,M).
'$thread_local'(X,M) :- var(X), !,
'$do_error'(instantiation_error,thread_local(M:X)).
'$thread_local'(Mod:Spec,_) :- !,
@ -376,11 +392,9 @@ thread_signal(Thread, Goal) :-
'$do_error'(type_error(integer,Thread),thread_signal(Thread, Goal)).
'$thread_signal'(Thread, Goal) :-
mutex_lock(Thread),
( recorded('$thread_signal',[Thread|_],R), erase(R), fail ; true ),
recorda('$thread_signal',[Thread|Goal],_),
'$signal_thread'(Thread).
mutex_unlock(Thread).
'$thread_gfetch'(G) :-
'$thread_self'(Id),