This repository has been archived on 2023-08-20. You can view files and clone it, but cannot push or open issues or pull requests.
yap-6.3/C/threads.c
Vítor Santos Costa f5b50a4179 fix locks
2015-06-19 10:10:02 +01:00

1845 lines
48 KiB
C

/*************************************************************************
* *
* YAP Prolog *
* *
* Yap Prolog was developed at NCCUP - Universidade do Porto *
* *
* Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 *
* *
**************************************************************************
* *
* File: stdpreds.c *
* Last rev: *
* mods: *
* comments: threads *
* *
*************************************************************************/
#ifdef SCCS
static char SccsId[] = "%W% %G%";
#endif
/**
@ingroup Threads
@{
*/
#include "Yap.h"
#include "Yatom.h"
#include "YapHeap.h"
#include "eval.h"
#include "yapio.h"
#include "blobs.h"
#include <stdio.h>
#if HAVE_STRING_H
#include <string.h>
#endif
#ifdef TABLING
#include "tab.macros.h"
#endif /* TABLING */
blob_type_t PL_Message_Queue = {
YAP_BLOB_MAGIC_B,
PL_BLOB_UNIQUE | PL_BLOB_NOCOPY,
"message_queue",
0, // release
0, // compare
0, // write
0 // acquire
};
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
int debug_locks = FALSE, debug_pe_locks = FALSE;
static Int p_debug_locks( USES_REGS1 ) { debug_pe_locks = 1; return TRUE; }
static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; debug_pe_locks = 0; return TRUE; }
#endif
#if THREADS
#include "threads.h"
int
Yap_ThreadID( void )
{
int new_worker_id = 0;
pthread_t self = pthread_self();
while(new_worker_id < MAX_THREADS &&
Yap_local[new_worker_id] &&
(REMOTE_ThreadHandle(new_worker_id).in_use == TRUE ||
REMOTE_ThreadHandle(new_worker_id).zombie == TRUE) ) {
if (pthread_equal(self , REMOTE_ThreadHandle(new_worker_id).pthread_handle) ) {
return new_worker_id;
}
new_worker_id++;
}
return -1;
}
int
Yap_NOfThreads(void) {
// GLOBAL_ThreadHandlesLock is held
return GLOBAL_NOfThreads;
}
static int
allocate_new_tid(void)
{
int new_worker_id = 0;
LOCK(GLOBAL_ThreadHandlesLock);
while(new_worker_id < MAX_THREADS &&
Yap_local[new_worker_id] &&
(REMOTE_ThreadHandle(new_worker_id).in_use == TRUE ||
REMOTE_ThreadHandle(new_worker_id).zombie == TRUE) )
new_worker_id++;
if (new_worker_id >= MAX_THREADS) {
new_worker_id = -1;
} else if (!Yap_local[new_worker_id]) {
if (!Yap_InitThread(new_worker_id)) {
UNLOCK(GLOBAL_ThreadHandlesLock);
return -1;
}
MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock));
REMOTE_ThreadHandle(new_worker_id).in_use = TRUE;
} else if (new_worker_id < MAX_THREADS) {
// reuse existing thread
MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock));
REMOTE_ThreadHandle(new_worker_id).in_use = TRUE;
} else {
new_worker_id = -1;
}
UNLOCK(GLOBAL_ThreadHandlesLock);
return new_worker_id;
}
static bool
mboxCreate( Term namet, mbox_t *mboxp USES_REGS )
{
pthread_mutex_t *mutexp;
pthread_cond_t *condp;
struct idb_queue *msgsp;
memset(mboxp, 0, sizeof(mbox_t));
condp = & mboxp->cond;
pthread_cond_init(condp, NULL);
mutexp = & mboxp->mutex;
pthread_mutex_init(mutexp, NULL);
msgsp = & mboxp->msgs;
mboxp->nmsgs = 0;
mboxp->nclients = 0;
Yap_init_tqueue(msgsp);
// match at the end, when everything is built.
mboxp->name = namet;
mboxp->open = true;
return true;
}
static bool
mboxDestroy( mbox_t *mboxp USES_REGS )
{
pthread_mutex_t *mutexp = &mboxp->mutex;
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
mboxp->open = false;
if (mboxp->nclients == 0 ) {
pthread_cond_destroy(condp);
pthread_mutex_destroy(mutexp);
Yap_destroy_tqueue(msgsp PASS_REGS);
// at this point, there is nothing left to unlock!
return true;
} else {
/* we have clients in the mailbox, try to wake them up one by one */
pthread_cond_broadcast(condp);
pthread_mutex_unlock(mutexp);
return true;
}
}
static bool
mboxSend( mbox_t *mboxp, Term t USES_REGS )
{
pthread_mutex_t *mutexp = &mboxp->mutex;
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
if (!mboxp->open) {
// oops, dead mailbox
return false;
}
Yap_enqueue_tqueue(msgsp, t PASS_REGS);
// printf("+ (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs);
mboxp->nmsgs++;
pthread_cond_broadcast(condp);
pthread_mutex_unlock(mutexp);
return true;
}
static bool
mboxReceive( mbox_t *mboxp, Term t USES_REGS )
{
pthread_mutex_t *mutexp = &mboxp->mutex;
pthread_cond_t *condp = &mboxp->cond;
struct idb_queue *msgsp = &mboxp->msgs;
bool rc;
if (!mboxp->open){
return false; // don't try to read if someone else already closed down...
}
mboxp->nclients++;
do {
rc = mboxp->nmsgs && Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS);
if (rc) {
mboxp->nclients--;
mboxp->nmsgs--;
//printf("- (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs);
// Yap_do_low_level_trace=1;
pthread_mutex_unlock(mutexp);
return true;
} else if (!mboxp->open) {
//printf("o (%d)\n", worker_id);
mboxp->nclients--;
if (!mboxp->nclients) {// release
pthread_cond_destroy(condp);
pthread_mutex_destroy(mutexp);
Yap_destroy_tqueue(msgsp PASS_REGS);
// at this point, there is nothing left to unlock!
} else {
pthread_cond_broadcast(condp);
pthread_mutex_unlock(mutexp);
}
return false;
} else {
pthread_cond_wait(condp, mutexp);
}
} while (!rc);
return rc;
}
static bool
mboxPeek( mbox_t *mboxp, Term t USES_REGS )
{
pthread_mutex_t *mutexp = &mboxp->mutex;
struct idb_queue *msgsp = &mboxp->msgs;
bool rc = Yap_dequeue_tqueue(msgsp, t, false, false PASS_REGS);
pthread_mutex_unlock(mutexp);
return rc;
}
static int
store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit)
{
CACHE_REGS
UInt pm; /* memory to be requested */
Term tmod;
if (tsize < MinTrailSpace)
tsize = MinTrailSpace;
if (ssize < MinStackSpace)
ssize = MinStackSpace;
REMOTE_ThreadHandle(new_worker_id).ssize = ssize;
REMOTE_ThreadHandle(new_worker_id).tsize = tsize;
REMOTE_ThreadHandle(new_worker_id).sysize = sysize;
if ((REGSTORE *)pthread_getspecific(Yap_yaamregs_key)) {
REMOTE_c_input_stream(new_worker_id) = LOCAL_c_input_stream;
REMOTE_c_output_stream(new_worker_id) = LOCAL_c_output_stream;
REMOTE_c_error_stream(new_worker_id) = LOCAL_c_error_stream;
} else {
// thread is created by a thread that has never run Prolog
REMOTE_c_input_stream(new_worker_id) = REMOTE_c_input_stream(0);
REMOTE_c_output_stream(new_worker_id) = REMOTE_c_output_stream(0);
REMOTE_c_error_stream(new_worker_id) = REMOTE_c_error_stream(0);
}
pm = (ssize + tsize)*K1;
if (!(REMOTE_ThreadHandle(new_worker_id).stack_address = malloc(pm))) {
return FALSE;
}
REMOTE_ThreadHandle(new_worker_id).tgoal =
Yap_StoreTermInDB(Deref(tgoal), 7);
if (CurrentModule) {
REMOTE_ThreadHandle(new_worker_id).cmod =
CurrentModule;
} else {
REMOTE_ThreadHandle(new_worker_id).cmod = USER_MODULE;
}
tdetach = Deref(tdetach);
if (IsVarTerm(tdetach)){
REMOTE_ThreadHandle(new_worker_id).tdetach =
MkAtomTerm(AtomFalse);
} else {
REMOTE_ThreadHandle(new_worker_id).tdetach =
tdetach;
}
tmod = CurrentModule;
texit = Yap_StripModule(Deref(texit), &tmod);
if (IsAtomTerm(tmod)) {
REMOTE_ThreadHandle(new_worker_id).texit_mod = tmod;
} else {
Yap_Error(TYPE_ERROR_ATOM,tmod,"module in exit call should be an atom");
}
REMOTE_ThreadHandle(new_worker_id).texit =
Yap_StoreTermInDB(texit,7);
REMOTE_ThreadHandle(new_worker_id).local_preds =
NULL;
REMOTE_ThreadHandle(new_worker_id).start_of_timesp =
NULL;
REMOTE_ThreadHandle(new_worker_id).last_timep =
NULL;
REMOTE_ScratchPad(new_worker_id).ptr =
NULL;
// reset arena info
REMOTE_GlobalArena(new_worker_id) =0;
return TRUE;
}
static void
kill_thread_engine (int wid, int always_die)
{
Prop p0 = AbsPredProp(REMOTE_ThreadHandle(wid).local_preds);
GlobalEntry *gl = REMOTE_GlobalVariables(wid);
REMOTE_ThreadHandle(wid).local_preds = NIL;
REMOTE_GlobalVariables(wid) = NULL;
/* kill all thread local preds */
while(p0) {
PredEntry *ap = RepPredProp(p0);
p0 = ap->NextOfPE;
Yap_Abolish(ap);
Yap_FreeCodeSpace((char *)ap);
}
while (gl) {
gl->global = TermFoundVar;
gl = gl->NextGE;
}
Yap_KillStacks(wid);
REMOTE_Signals(wid) = 0L;
// must be done before relessing the memory used to store
// thread local time.
if (!always_die) {
/* called by thread itself */
GLOBAL_ThreadsTotalTime += Yap_cputime();
}
if (REMOTE_ScratchPad(wid).ptr)
free(REMOTE_ScratchPad(wid).ptr);
// if (REMOTE_TmpPred(wid).ptr)
// free(REMOTE_TmpPred(wid).ptr);
REMOTE_ThreadHandle(wid).current_yaam_regs = NULL;
if (REMOTE_ThreadHandle(wid).start_of_timesp)
free(REMOTE_ThreadHandle(wid).start_of_timesp);
if (REMOTE_ThreadHandle(wid).last_timep)
free(REMOTE_ThreadHandle(wid).last_timep);
if (REMOTE_ThreadHandle(wid).texit) {
Yap_FreeCodeSpace((ADDR)REMOTE_ThreadHandle(wid).texit);
}
/* FreeCodeSpace requires LOCAL requires yaam_regs */
free(REMOTE_ThreadHandle(wid).default_yaam_regs);
REMOTE_ThreadHandle(wid).default_yaam_regs = NULL;
LOCK(GLOBAL_ThreadHandlesLock);
GLOBAL_NOfThreads--;
UNLOCK(GLOBAL_ThreadHandlesLock);
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
if (REMOTE_ThreadHandle(wid).tdetach == MkAtomTerm(AtomTrue) ||
always_die) {
REMOTE_ThreadHandle(wid).zombie = FALSE;
REMOTE_ThreadHandle(wid).in_use = FALSE;
}
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
}
static void
thread_die(int wid, int always_die)
{
kill_thread_engine(wid, always_die);
}
static int
setup_engine(int myworker_id, int init_thread)
{
CACHE_REGS
REGSTORE *standard_regs;
standard_regs = (REGSTORE *)calloc(1,sizeof(REGSTORE));
if (!standard_regs)
return FALSE;
regcache = standard_regs;
/* create the YAAM descriptor */
REMOTE_ThreadHandle(myworker_id).default_yaam_regs = standard_regs;
REMOTE_ThreadHandle(myworker_id).current_yaam_regs = standard_regs;
Yap_InitExStacks(myworker_id, REMOTE_ThreadHandle(myworker_id).tsize, REMOTE_ThreadHandle(myworker_id).ssize);
REMOTE_SourceModule(myworker_id) = CurrentModule = REMOTE_ThreadHandle(myworker_id).cmod;
// create a mbox
mboxCreate( MkIntTerm(myworker_id), &REMOTE_ThreadHandle(myworker_id).mbox_handle PASS_REGS );
Yap_InitTime( myworker_id );
Yap_InitYaamRegs( myworker_id );
REFRESH_CACHE_REGS
Yap_ReleasePreAllocCodeSpace(Yap_PreAllocCodeSpace());
/* I exist */
GLOBAL_NOfThreadsCreated++;
GLOBAL_NOfThreads++;
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(myworker_id).tlock));
#ifdef TABLING
new_dependency_frame(REMOTE_top_dep_fr(myworker_id), FALSE, NULL, NULL, B, NULL, FALSE, NULL); /* same as in Yap_init_root_frames() */
#endif /* TABLING */
return TRUE;
}
static void
start_thread(int myworker_id)
{
CACHE_REGS
pthread_setspecific(Yap_yaamregs_key, (void *)REMOTE_ThreadHandle(myworker_id).default_yaam_regs);
REFRESH_CACHE_REGS;
worker_id = myworker_id;
LOCAL = REMOTE(myworker_id);
}
static void *
thread_run(void *widp)
{
CACHE_REGS
Term tgoal, t;
Term tgs[2];
int myworker_id = *((int *)widp);
#ifdef OUTPUT_THREADS_TABLING
char thread_name[25];
char filename[YAP_FILENAME_MAX];
sprintf(thread_name, "/thread_output_%d", myworker_id);
strcpy(filename, YAP_BINDIR);
strncat(filename, thread_name, 25);
REMOTE_thread_output(myworker_id) = fopen(filename, "w");
#endif /* OUTPUT_THREADS_TABLING */
start_thread(myworker_id);
REFRESH_CACHE_REGS;
do {
t = tgs[0] = Yap_PopTermFromDB(LOCAL_ThreadHandle.tgoal);
if (t == 0) {
if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) {
LOCAL_Error_TYPE = YAP_NO_ERROR;
if (!Yap_growglobal(NULL)) {
Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage);
thread_die(worker_id, FALSE);
return NULL;
}
} else {
LOCAL_Error_TYPE = YAP_NO_ERROR;
if (!Yap_growstack(LOCAL_ThreadHandle.tgoal->NOfCells*CellSize)) {
Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage);
thread_die(worker_id, FALSE);
return NULL;
}
}
}
} while (t == 0);
REMOTE_ThreadHandle(myworker_id).tgoal = NULL;
tgs[1] = LOCAL_ThreadHandle.tdetach;
tgoal = Yap_MkApplTerm(FunctorThreadRun, 2, tgs);
Yap_RunTopGoal(tgoal);
#ifdef TABLING
{
tab_ent_ptr tab_ent;
tab_ent = GLOBAL_root_tab_ent;
while (tab_ent) {
abolish_table(tab_ent);
tab_ent = TabEnt_next(tab_ent);
}
FREE_DEPENDENCY_FRAME(REMOTE_top_dep_fr(worker_id));
REMOTE_top_dep_fr(worker_id) = NULL;
#ifdef USE_PAGES_MALLOC
DETACH_PAGES(_pages_void);
#endif /* USE_PAGES_MALLOC */
DETACH_PAGES(_pages_tab_ent);
#if defined(THREADS_FULL_SHARING) || defined(THREADS_CONSUMER_SHARING)
DETACH_PAGES(_pages_sg_ent);
#endif /* THREADS_FULL_SHARING || THREADS_CONSUMER_SHARING */
DETACH_PAGES(_pages_sg_fr);
DETACH_PAGES(_pages_dep_fr);
DETACH_PAGES(_pages_sg_node);
DETACH_PAGES(_pages_sg_hash);
DETACH_PAGES(_pages_ans_node);
DETACH_PAGES(_pages_ans_hash);
#if defined(THREADS_FULL_SHARING)
DETACH_PAGES(_pages_ans_ref_node);
#endif /* THREADS_FULL_SHARING */
DETACH_PAGES(_pages_gt_node);
DETACH_PAGES(_pages_gt_hash);
#ifdef OUTPUT_THREADS_TABLING
fclose(LOCAL_thread_output);
#endif /* OUTPUT_THREADS_TABLING */
}
#endif /* TABLING */
thread_die(worker_id, FALSE);
return NULL;
}
static Int
p_thread_new_tid( USES_REGS1 )
{
int new_worker = allocate_new_tid();
if (new_worker == -1) {
Yap_Error(RESOURCE_ERROR_MAX_THREADS, MkIntegerTerm(MAX_THREADS), "");
return FALSE;
}
return Yap_unify(MkIntegerTerm(new_worker), ARG1);
}
static int
init_thread_engine(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit)
{
return store_specs(new_worker_id, ssize, tsize, sysize, tgoal, tdetach, texit);
}
static Int
p_create_thread( USES_REGS1 )
{
UInt ssize;
UInt tsize;
UInt sysize;
Term x2 = Deref(ARG2);
Term x3 = Deref(ARG3);
Term x4 = Deref(ARG4);
int new_worker_id = IntegerOfTerm(Deref(ARG7)),
owid = worker_id;
// fprintf(stderr," %d --> %d\n", worker_id, new_worker_id);
if (IsBigIntTerm(x2))
return FALSE;
if (IsBigIntTerm(x3))
return FALSE;
ssize = IntegerOfTerm(x2);
tsize = IntegerOfTerm(x3);
sysize = IntegerOfTerm(x4);
/* UInt systemsize = IntegerOfTerm(Deref(ARG4)); */
if (new_worker_id == -1) {
/* YAP ERROR */
return FALSE;
}
/* make sure we can proceed */
if (!init_thread_engine(new_worker_id, ssize, tsize, sysize, ARG1, ARG5, ARG6))
return FALSE;
//REMOTE_ThreadHandle(new_worker_id).pthread_handle = 0L;
REMOTE_ThreadHandle(new_worker_id).id = new_worker_id;
REMOTE_ThreadHandle(new_worker_id).ref_count = 1;
setup_engine(new_worker_id, FALSE);
if ((REMOTE_ThreadHandle(new_worker_id).ret = pthread_create(&REMOTE_ThreadHandle(new_worker_id).pthread_handle, NULL, thread_run, (void *)(&(REMOTE_ThreadHandle(new_worker_id).id)))) == 0) {
pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(owid).current_yaam_regs);
/* wait until the client is initialised */
return TRUE;
}
pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(owid).current_yaam_regs);
return FALSE;
}
static Int
p_thread_sleep( USES_REGS1 )
{
UInt time = IntegerOfTerm(Deref(ARG1));
#if HAVE_NANOSLEEP
UInt ntime = IntegerOfTerm(Deref(ARG2));
struct timespec req, oreq ;
req.tv_sec = time;
req.tv_nsec = ntime;
if (nanosleep(&req, &oreq)) {
#if HAVE_STRERROR
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "%s in thread_sleep/1", strerror(errno));
#else
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "error %d in thread_sleep/1", errno);
#endif
return FALSE;
}
return Yap_unify(ARG3,MkIntegerTerm(oreq.tv_sec)) &&
Yap_unify(ARG4,MkIntegerTerm(oreq.tv_nsec));
#elif HAVE_SLEEP
UInt rtime;
if ((rtime = sleep(time)) < 0) {
#if HAVE_STRERROR
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "%s in thread_sleep/1", strerror(errno));
#else
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "error %d in thread_sleep/1", errno);
#endif
}
return Yap_unify(ARG3,MkIntegerTerm(rtime)) &&
Yap_unify(ARG4,MkIntTerm(0L));
#else
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "no support for thread_sleep/1 in this YAP configuration");
#endif
}
static Int
p_thread_self( USES_REGS1 )
{
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
return Yap_unify(MkIntegerTerm(-1), ARG1);
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
}
static Int
p_thread_zombie_self( USES_REGS1 )
{
/* make sure the lock is available */
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
return Yap_unify(MkIntegerTerm(-1), ARG1);
if (Yap_has_signal( YAP_ITI_SIGNAL )) {
return FALSE;
}
// fprintf(stderr," -- %d\n", worker_id);
LOCAL_ThreadHandle.in_use = FALSE;
LOCAL_ThreadHandle.zombie = TRUE;
MUTEX_UNLOCK(&(LOCAL_ThreadHandle.tlock));
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
}
static Int
p_thread_status_lock( USES_REGS1 )
{
/* make sure the lock is available */
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
return FALSE;
MUTEX_LOCK(&(LOCAL_ThreadHandle.tlock_status));
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
}
static Int
p_thread_status_unlock( USES_REGS1 )
{
/* make sure the lock is available */
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
return FALSE;
MUTEX_UNLOCK(&(LOCAL_ThreadHandle.tlock_status));
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
}
Int
Yap_thread_self(void)
{
CACHE_REGS
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
return -1;
return worker_id;
}
CELL
Yap_thread_create_engine(YAP_thread_attr *ops)
{
YAP_thread_attr opsv;
int new_id = allocate_new_tid();
Term t = TermNil;
/*
ok, this creates a problem, because we are initializing an engine from
some "empty" thread.
We need first to fool the thread into believing it is the main thread
*/
if (new_id == -1) {
/* YAP ERROR */
return -1;
}
if (ops == NULL) {
ops = &opsv;
ops->tsize = DefHeapSpace;
ops->ssize = DefStackSpace;
ops->sysize = 0;
ops->egoal = t;
}
if (!pthread_equal(pthread_self() , GLOBAL_master_thread) ) {
/* we are worker_id 0 for now, lock master thread so that no one messes with us */
pthread_setspecific(Yap_yaamregs_key, (const void *)&Yap_standard_regs);
MUTEX_LOCK(&(REMOTE_ThreadHandle(0).tlock));
}
if (!init_thread_engine(new_id, ops->ssize, ops->tsize, ops->sysize, t, t, (ops->egoal)))
return -1;
//REMOTE_ThreadHandle(new_id).pthread_handle = 0L;
REMOTE_ThreadHandle(new_id).id = new_id;
REMOTE_ThreadHandle(new_id).ref_count = 0;
if (!setup_engine(new_id, FALSE))
return -1;
if (!pthread_equal(pthread_self(), GLOBAL_master_thread)) {
pthread_setspecific(Yap_yaamregs_key, NULL);
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(0).tlock));
}
return new_id;
}
Int
Yap_thread_attach_engine(int wid)
{
/*
already locked
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
*/
if (REMOTE_ThreadHandle(wid).ref_count ) {
REMOTE_ThreadHandle(wid).ref_count++;
REMOTE_ThreadHandle(wid).pthread_handle = pthread_self();
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
return TRUE;
}
REMOTE_ThreadHandle(wid).pthread_handle = pthread_self();
REMOTE_ThreadHandle(wid).ref_count++;
pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(wid).current_yaam_regs);
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
return TRUE;
}
Int
Yap_thread_detach_engine(int wid)
{
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
//REMOTE_ThreadHandle(wid).pthread_handle = 0;
REMOTE_ThreadHandle(wid).ref_count--;
pthread_setspecific(Yap_yaamregs_key, NULL);
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
return TRUE;
}
Int
Yap_thread_destroy_engine(int wid)
{
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
if (REMOTE_ThreadHandle(wid).ref_count == 0) {
kill_thread_engine(wid, TRUE);
return TRUE;
} else {
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
return FALSE;
}
}
static Int
p_thread_join( USES_REGS1 )
{
Int tid = IntegerOfTerm(Deref(ARG1));
pthread_t thread;
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
if (!(REMOTE_ThreadHandle(tid).in_use ||
REMOTE_ThreadHandle(tid).zombie)) {
// he's dead, jim
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return FALSE;
}
if (!REMOTE_ThreadHandle(tid).tdetach == MkAtomTerm(AtomTrue)) {
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return FALSE;
}
thread = REMOTE_ThreadHandle(tid).pthread_handle;
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
/* make sure this lock is accessible */
if (pthread_join(thread, NULL) < 0) {
/* ERROR */
return FALSE;
}
return TRUE;
}
static Int
p_thread_destroy( USES_REGS1 )
{
Int tid = IntegerOfTerm(Deref(ARG1));
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
REMOTE_ThreadHandle(tid).zombie = FALSE;
REMOTE_ThreadHandle(tid).in_use = FALSE;
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return TRUE;
}
static Int
p_thread_detach( USES_REGS1 )
{
Int tid = IntegerOfTerm(Deref(ARG1));
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
if (pthread_detach(REMOTE_ThreadHandle(tid).pthread_handle) < 0) {
/* ERROR */
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return FALSE;
}
REMOTE_ThreadHandle(tid).tdetach =
MkAtomTerm(AtomTrue);
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return TRUE;
}
static Int
p_thread_detached( USES_REGS1 )
{
if (LOCAL_ThreadHandle.tdetach)
return Yap_unify(ARG1,LOCAL_ThreadHandle.tdetach);
else
return FALSE;
}
static Int
p_thread_detached2( USES_REGS1 )
{
Int tid = IntegerOfTerm(Deref(ARG1));
if (REMOTE_ThreadHandle(tid).tdetach)
return Yap_unify(ARG2,REMOTE_ThreadHandle(tid).tdetach);
else
return FALSE;
}
static Int
p_thread_exit( USES_REGS1 )
{
thread_die(worker_id, FALSE);
pthread_exit(NULL);
/* done, just make gcc happy */
return TRUE;
}
static Int
p_thread_set_concurrency( USES_REGS1 )
{
#if HAVE_PTHREAD_GETCONCURRENCY
int newc;
int cur;
Term tnew = Deref(ARG2);
if (IsVarTerm(tnew)) {
newc = 0;
} else if (IsIntegerTerm(tnew)) {
newc = IntegerOfTerm(tnew);
} else {
Yap_Error(TYPE_ERROR_INTEGER,tnew,"thread_set_concurrency/2");
return(FALSE);
}
cur = MkIntegerTerm(pthread_getconcurrency());
if (pthread_setconcurrency(newc) != 0) {
return FALSE;
}
return Yap_unify(ARG1, MkIntegerTerm(cur));
#else
return FALSE;
#endif
}
static Int
p_thread_yield( USES_REGS1 )
{
if (sched_yield() != 0) {
return FALSE;
}
return TRUE;
}
static Int
p_valid_thread( USES_REGS1 )
{
Int i = IntegerOfTerm(Deref(ARG1));
return REMOTE_ThreadHandle(i).in_use || REMOTE_ThreadHandle(i).zombie;
}
/* Mutex Support */
typedef struct swi_mutex {
UInt owners;
Int tid_own;
MutexEntry *alias;
pthread_mutex_t m;
UInt timestamp;
struct swi_mutex *backbone; // chain of all mutexes
struct swi_mutex *prev, *next; // chain of locked mutexes
} SWIMutex;
static SWIMutex *NewMutex(void) {
SWIMutex* mutp;
pthread_mutexattr_t mat;
#if defined(HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && !defined(__MINGW32__)
extern int pthread_mutexattr_setkind_np(pthread_mutexattr_t *attr, int kind);
#endif
LOCK(GLOBAL_MUT_ACCESS);
mutp = GLOBAL_FreeMutexes;
while (mutp) {
if ((Int)(mutp->owners) < 0) {
// just making sure
break;
}
mutp = mutp->next;
}
if (mutp == NULL) {
mutp = (SWIMutex *)Yap_AllocCodeSpace(sizeof(SWIMutex));
if (mutp == NULL) {
UNLOCK(GLOBAL_MUT_ACCESS);
return NULL;
} else {
pthread_mutexattr_init(&mat);
mutp->timestamp = 0;
#if defined(HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && !defined(__MINGW32__)
pthread_mutexattr_setkind_np(&mat, PTHREAD_MUTEX_RECURSIVE_NP);
#else
#ifdef HAVE_PTHREAD_MUTEXATTR_SETTYPE
pthread_mutexattr_settype(&mat, PTHREAD_MUTEX_RECURSIVE);
#endif
#endif
pthread_mutex_init(&mutp->m, &mat);
}
mutp->backbone = GLOBAL_mutex_backbone;
GLOBAL_mutex_backbone = mutp;
} else {
// reuse existing mutex
mutp->timestamp++;
}
mutp->owners = 0;
mutp->tid_own = 0;
mutp->alias = NIL;
UNLOCK(GLOBAL_MUT_ACCESS);
return mutp;
}
#define MutexOfTerm(t) MutexOfTerm__(t PASS_REGS)
static SWIMutex *MutexOfTerm__(Term t USES_REGS){
Term t1 = Deref(t);
SWIMutex *mut = NULL;
if (IsVarTerm(t1)) {
Yap_Error(INSTANTIATION_ERROR, t1, "mutex operation");
return NULL;
} else if (IsApplTerm(t1) && FunctorOfTerm(t1) == FunctorMutex) {
mut = AddressOfTerm(ArgOfTerm(1,t1));
if ((Int)(mut->owners) < 0 ||
IntegerOfTerm(ArgOfTerm(2,t1)) != mut->timestamp) {
Yap_Error(EXISTENCE_ERROR_MUTEX, t1, "mutex access");
return NULL;
}
} else if (IsAtomTerm(t1)) {
mut = Yap_GetMutexFromProp(AtomOfTerm(t1));
if (!mut) {
mut = NewMutex();
if ( !Yap_PutAtomMutex( AtomOfTerm(t1), mut ) ) {
return NULL;
}
}
}
return mut;
}
static Int
p_new_mutex( USES_REGS1 ){
SWIMutex* mutp;
Term t1;
if (IsVarTerm((t1 = Deref(ARG1)))) {
Term ts[2];
if (!(mutp = NewMutex()))
return FALSE;
ts[0] = MkAddressTerm(mutp);
ts[1] = MkIntegerTerm(mutp->timestamp);
if (Yap_unify(ARG1, Yap_MkApplTerm(FunctorMutex, 2, ts) ) ) {
return TRUE;
}
Yap_Error(UNINSTANTIATION_ERROR, t1, "mutex_create on an existing mutex");
return FALSE;
} else if(IsAtomTerm(t1)) {
if (!(mutp = NewMutex()))
return FALSE;
return Yap_PutAtomMutex( AtomOfTerm(t1), mutp );
} else if (IsApplTerm(t1) && FunctorOfTerm(t1) == FunctorMutex) {
Yap_Error(UNINSTANTIATION_ERROR, t1, "mutex_create on an existing mutex");
return FALSE;
}
return FALSE;
}
/** @pred mutex_destroy(+ _MutexId_)
Destroy a mutex. After this call, _MutexId_ becomes invalid and
further references yield an `existence_error` exception.
*/
static Int p_destroy_mutex( USES_REGS1 )
{
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut)
return FALSE;
if (pthread_mutex_destroy(&mut->m) < 0)
return FALSE;
if (mut->alias) {
mut->alias->Mutex = NULL;
}
mut->owners = -1;
mut->tid_own = -1;
LOCK(GLOBAL_MUT_ACCESS);
if (GLOBAL_FreeMutexes)
mut->prev = GLOBAL_FreeMutexes->prev;
else
mut->prev = NULL;
mut->next = GLOBAL_FreeMutexes;
GLOBAL_FreeMutexes = mut;
UNLOCK(GLOBAL_MUT_ACCESS);
return TRUE;
}
static bool
LockMutex( SWIMutex *mut USES_REGS)
{
#if DEBUG_LOCKS
MUTEX_LOCK(&mut->m);
#else
if (MUTEX_LOCK(&mut->m) < 0)
return FALSE;
#endif
mut->owners++;
mut->tid_own = worker_id;
if (LOCAL_Mutexes)
mut->prev = LOCAL_Mutexes->prev;
else
mut->prev = NULL;
mut->next = LOCAL_Mutexes;
LOCAL_Mutexes = NULL;
return true;
}
static bool
UnLockMutex( SWIMutex *mut USES_REGS)
{
#if DEBUG_LOCKS
MUTEX_UNLOCK(&mut->m);
#else
if (MUTEX_UNLOCK(&mut->m) < 0)
return FALSE;
#endif
mut->owners--;
if (mut->prev) {
mut->prev->next = mut->next;
} else {
LOCAL_Mutexes = mut->next;
if (mut->next)
mut->next->prev = NULL;
}
if (mut->next)
mut->next->prev = mut->prev;
return true;
}
/** @pred mutex_lock(+ _MutexId_)
Lock the mutex. Prolog mutexes are <em>recursive</em> mutexes: they
can be locked multiple times by the same thread. Only after unlocking
it as many times as it is locked, the mutex becomes available for
locking by other threads. If another thread has locked the mutex the
calling thread is suspended until to mutex is unlocked.
If _MutexId_ is an atom, and there is no current mutex with that
name, the mutex is created automatically using mutex_create/1. This
implies named mutexes need not be declared explicitly.
Please note that locking and unlocking mutexes should be paired
carefully. Especially make sure to unlock mutexes even if the protected
code fails or raises an exception. For most common cases use
with_mutex/2, which provides a safer way for handling Prolog-level
mutexes.
*/
static Int
p_lock_mutex( USES_REGS1 )
{
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut || !LockMutex( mut PASS_REGS))
return FALSE;
return TRUE;
}
/** @pred mutex_trylock(+ _MutexId_)
As mutex_lock/1, but if the mutex is held by another thread, this
predicates fails immediately.
*/
static Int
p_trylock_mutex( USES_REGS1 )
{
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut)
return FALSE;
if (MUTEX_TRYLOCK(&mut->m) == EBUSY)
return FALSE;
mut->owners++;
mut->tid_own = worker_id;
return TRUE;
}
/** @pred mutex_unlock(+ _MutexId_)
Unlock the mutex. This can only be called if the mutex is held by the
calling thread. If this is not the case, a `permission_error`
exception is raised.
*/
static Int
p_unlock_mutex( USES_REGS1 )
{
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut || !UnLockMutex( mut PASS_REGS))
return FALSE;
return TRUE;
}
/** @pred with_mutex(+ _MutexId_, : _Goal_)
Execute _Goal_ while holding _MutexId_. If _Goal_ leaves
choicepoints, these are destroyed (as in once/1). The mutex is unlocked
regardless of whether _Goal_ succeeds, fails or raises an exception.
An exception thrown by _Goal_ is re-thrown after the mutex has been
successfully unlocked. See also `mutex_create/2`.
Although described in the thread-section, this predicate is also
available in the single-threaded version, where it behaves simply as
once/1.
*/
static Int
p_with_mutex( USES_REGS1 )
{
Term excep;
Int rc = FALSE;
Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
PredEntry *pe;
Term tm = CurrentModule;
Term tg = Deref(ARG2);
SWIMutex *mut = MutexOfTerm( ARG1 );
if (!mut || !LockMutex(mut PASS_REGS)) {
return FALSE;
}
tg = Yap_StripModule(tg, &tm);
if (IsVarTerm(tg)) {
Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2");
goto end;
} else if (IsApplTerm(tg)) {
register Functor f = FunctorOfTerm(tg);
register CELL *pt;
size_t i, arity;
f = FunctorOfTerm(tg);
if (IsExtensionFunctor(f)) {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
arity = ArityOfFunctor(f);
if (arity > MaxTemps) {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
pe = RepPredProp(PredPropByFunc(f, tm));
pt = RepAppl(tg)+1;
for (i= 0; i < arity; i++ )
XREGS[i+1] = pt[i];
} else if (IsAtomTerm(tg)) {
pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm));
} else if (IsPairTerm(tg)) {
register CELL *pt;
Functor f;
f = FunctorDot;
pe = RepPredProp(PredPropByFunc(f, tm));
pt = RepPair(tg);
XREGS[1] = pt[0];
XREGS[2] = pt[1];
} else {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
if (
pe->OpcodeOfPred != FAIL_OPCODE &&
Yap_execute_pred(pe, NULL, true PASS_REGS) ) {
rc = TRUE;
}
end:
excep = Yap_GetException();
if ( !UnLockMutex(mut PASS_REGS) ) {
return FALSE;
}
if (creeping) {
Yap_signal( YAP_CREEP_SIGNAL );
} else if ( excep != 0) {
return Yap_JumpToEnv(excep);
}
return rc;
}
static Int
p_with_with_mutex( USES_REGS1 )
{
if (GLOBAL_WithMutex == NULL) {
p_new_mutex( PASS_REGS1 );
GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
} else {
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
}
return p_lock_mutex( PASS_REGS1 );
}
static Int
p_unlock_with_mutex( USES_REGS1 )
{
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
return p_unlock_mutex( PASS_REGS1 );
}
static Int
p_mutex_info( USES_REGS1 )
{
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut)
return FALSE;
return Yap_unify(ARG2, MkIntegerTerm(mut->owners)) &&
Yap_unify(ARG3, MkIntegerTerm(mut->tid_own));
return TRUE;
}
typedef struct {
UInt indx;
mbox_t mbox;
} counted_mbox;
static Int
p_mbox_create( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = GLOBAL_named_mboxes;
if (IsVarTerm(namet)) {
AtomEntry *ae;
int new;
mbox_t mbox;
ae = Yap_lookupBlob(&mbox, sizeof(mbox), &PL_Message_Queue, &new);
namet = MkAtomTerm(RepAtom(ae));
mboxp = (mbox_t *)(ae->rep.blob[0].data);
Yap_unify(ARG1, namet);
LOCK(GLOBAL_mboxq_lock);
} else if (IsAtomTerm(namet)) {
LOCK(GLOBAL_mboxq_lock);
while( mboxp && mboxp->name != namet)
mboxp = mboxp->next;
if (mboxp) {
UNLOCK(GLOBAL_mboxq_lock);
return FALSE;
}
mboxp = (mbox_t *)Yap_AllocCodeSpace(sizeof(mbox_t));
if (mboxp == NULL) {
UNLOCK(GLOBAL_mboxq_lock);
return FALSE;
}
// global mbox, for now we'll just insert in list
mboxp->next = GLOBAL_named_mboxes;
GLOBAL_named_mboxes = mboxp;
}
bool rc = mboxCreate( namet, mboxp PASS_REGS );
UNLOCK(GLOBAL_mboxq_lock);
return rc;
}
static Int
p_mbox_destroy( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = GLOBAL_named_mboxes, *prevp;
if (IsVarTerm(namet) )
return FALSE;
if (IsIntTerm(namet) ) {
return FALSE;
}
LOCK(GLOBAL_mboxq_lock);
prevp = NULL;
while( mboxp && mboxp->name != namet) {
prevp = mboxp;
mboxp = mboxp->next;
}
if (!mboxp) {
UNLOCK(GLOBAL_mboxq_lock);
return FALSE;
}
if (mboxp == GLOBAL_named_mboxes) {
GLOBAL_named_mboxes = mboxp->next;
} else {
prevp->next = mboxp->next;
}
UNLOCK(GLOBAL_mboxq_lock);
mboxDestroy(mboxp PASS_REGS);
Yap_FreeCodeSpace( (char *)mboxp );
return TRUE;
}
static mbox_t*
getMbox(Term t)
{
mbox_t* mboxp;
if (IsAtomTerm(t=Deref(t))) {
Atom at = AtomOfTerm(t);
LOCK(GLOBAL_mboxq_lock);
if (IsBlob(at)) {
mboxp = (mbox_t *)(RepAtom(at)->rep.blob[0].data);
} else {
mboxp = GLOBAL_named_mboxes;
while( mboxp && mboxp->name != t) {
mboxp = mboxp->next;
}
}
if (!mboxp->open)
mboxp = NULL;
if (mboxp) {
pthread_mutex_lock(& mboxp->mutex);
}
UNLOCK(GLOBAL_mboxq_lock);
} else if (IsIntTerm(t)) {
int wid = IntOfTerm(t);
if (REMOTE(wid) &&
(REMOTE_ThreadHandle(wid).in_use || REMOTE_ThreadHandle(wid).zombie))
{
return &REMOTE_ThreadHandle(wid).mbox_handle;
} else {
return NULL;
}
if (!mboxp->open)
mboxp = NULL;
if (mboxp) {
pthread_mutex_lock(& mboxp->mutex);
}
} else {
return NULL;
}
return mboxp;
}
static Int
p_mbox_send( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
if (!mboxp)
return FALSE;
return mboxSend(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_mbox_size( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
if (!mboxp)
return FALSE;
return Yap_unify( ARG2, MkIntTerm(mboxp->nmsgs));
}
static Int
p_mbox_receive( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
if (!mboxp)
return FALSE;
return mboxReceive(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_mbox_peek( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
if (!mboxp)
return FALSE;
return mboxPeek(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_cond_create( USES_REGS1 )
{
pthread_cond_t* condp;
condp = (pthread_cond_t *)Yap_AllocCodeSpace(sizeof(pthread_cond_t));
if (condp == NULL) {
return FALSE;
}
pthread_cond_init(condp, NULL);
return Yap_unify(ARG1, MkIntegerTerm((Int)condp));
}
static Int
p_cond_destroy( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_destroy(condp) < 0)
return FALSE;
Yap_FreeCodeSpace((void *)condp);
return TRUE;
}
static Int
p_cond_signal( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_signal(condp) < 0)
return FALSE;
return TRUE;
}
static Int
p_cond_broadcast( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_broadcast(condp) < 0)
return FALSE;
return TRUE;
}
static Int
p_cond_wait( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2));
pthread_cond_wait(condp, &mut->m);
return TRUE;
}
static Int
p_thread_stacks( USES_REGS1 )
{ /* '$thread_signal'(+P) */
Int tid = IntegerOfTerm(Deref(ARG1));
Int status= TRUE;
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
if (REMOTE(tid) &&
(REMOTE_ThreadHandle(tid).in_use || REMOTE_ThreadHandle(tid).zombie)) {
status &= Yap_unify(ARG2,MkIntegerTerm(REMOTE_ThreadHandle(tid).ssize));
status &= Yap_unify(ARG3,MkIntegerTerm(REMOTE_ThreadHandle(tid).tsize));
status &= Yap_unify(ARG4,MkIntegerTerm(REMOTE_ThreadHandle(tid).sysize));
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return status;
}
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
return FALSE;
}
static Int
p_thread_atexit( USES_REGS1 )
{ /* '$thread_signal'(+P) */
Term t;
if (LOCAL_ThreadHandle.texit == NULL ||
LOCAL_ThreadHandle.texit->Entry == MkAtomTerm(AtomTrue)) {
return FALSE;
}
do {
t = Yap_PopTermFromDB(LOCAL_ThreadHandle.texit);
if (t == 0) {
if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) {
LOCAL_Error_TYPE = YAP_NO_ERROR;
if (!Yap_growglobal(NULL)) {
Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage);
thread_die(worker_id, FALSE);
return FALSE;
}
} else {
LOCAL_Error_TYPE = YAP_NO_ERROR;
if (!Yap_growstack(LOCAL_ThreadHandle.tgoal->NOfCells*CellSize)) {
Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage);
thread_die(worker_id, FALSE);
return FALSE;
}
}
}
} while (t == 0);
LOCAL_ThreadHandle.texit = NULL;
return Yap_unify(ARG1, t) && Yap_unify(ARG2, LOCAL_ThreadHandle.texit_mod);
}
static Int
p_thread_signal( USES_REGS1 )
{ /* '$thread_signal'(+P) */
Int wid = IntegerOfTerm(Deref(ARG1));
/* make sure the lock is available */
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
if (!REMOTE_ThreadHandle(wid).in_use ||
!REMOTE_ThreadHandle(wid).current_yaam_regs) {
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
return TRUE;
}
Yap_external_signal( wid, YAP_ITI_SIGNAL );
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
return TRUE;
}
static Int
p_no_threads( USES_REGS1 )
{ /* '$thread_signal'(+P) */
return FALSE;
}
static Int
p_nof_threads( USES_REGS1 )
{ /* '$nof_threads'(+P) */
int i = 0, wid;
LOCK(GLOBAL_ThreadHandlesLock);
for (wid = 0; wid < MAX_THREADS; wid++) {
if (!Yap_local[wid]) break;
if (REMOTE_ThreadHandle(wid).in_use)
i++;
}
UNLOCK(GLOBAL_ThreadHandlesLock);
return Yap_unify(ARG1,MkIntegerTerm(i));
}
static Int
p_max_workers( USES_REGS1 )
{ /* '$max_workers'(+P) */
return Yap_unify(ARG1,MkIntegerTerm(MAX_WORKERS));
}
static Int
p_max_threads( USES_REGS1 )
{ /* '$max_threads'(+P) */
return Yap_unify(ARG1,MkIntegerTerm(MAX_THREADS));
}
static Int
p_nof_threads_created( USES_REGS1 )
{ /* '$nof_threads'(+P) */
return Yap_unify(ARG1,MkIntTerm(GLOBAL_NOfThreadsCreated));
}
static Int
p_thread_runtime( USES_REGS1 )
{ /* '$thread_runtime'(+P) */
return Yap_unify(ARG1,MkIntegerTerm(GLOBAL_ThreadsTotalTime));
}
static Int
p_thread_self_lock( USES_REGS1 )
{ /* '$thread_unlock' */
MUTEX_LOCK(&(LOCAL_ThreadHandle.tlock));
return Yap_unify(ARG1,MkIntegerTerm(worker_id));
}
static Int
p_thread_unlock( USES_REGS1 )
{ /* '$thread_unlock' */
Int wid = IntegerOfTerm(Deref(ARG1));
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
return TRUE;
}
intptr_t
system_thread_id(void)
{
#ifdef HAVE_GETTID_SYSCALL
return syscall(__NR_gettid);
#elif defined( HAVE_GETTID_MACRO )
return gettid();
#elif defined(__WINDOWS__)
return GetCurrentThreadId();
#endif
}
void
Yap_InitFirstWorkerThreadHandle(void)
{
CACHE_REGS
LOCAL_ThreadHandle.id = 0;
LOCAL_ThreadHandle.in_use = TRUE;
LOCAL_ThreadHandle.default_yaam_regs =
&Yap_standard_regs;
LOCAL_ThreadHandle.current_yaam_regs =
&Yap_standard_regs;
LOCAL_ThreadHandle.pthread_handle = pthread_self();
pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock, NULL);
pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock_status, NULL);
LOCAL_ThreadHandle.tdetach = MkAtomTerm(AtomFalse);
LOCAL_ThreadHandle.ref_count = 1;
}
void Yap_InitThreadPreds(void)
{
Yap_InitCPred("$no_threads", 0, p_no_threads, 0);
Yap_InitCPred("$max_workers", 1, p_max_workers, 0);
Yap_InitCPred("$max_threads", 1, p_max_threads, 0);
Yap_InitCPred("$thread_new_tid", 1, p_thread_new_tid, 0);
Yap_InitCPred("$create_thread", 7, p_create_thread, 0);
Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag);
Yap_InitCPred("$thread_status_lock", 1, p_thread_status_lock, SafePredFlag);
Yap_InitCPred("$thread_status_unlock", 1, p_thread_status_unlock, SafePredFlag);
Yap_InitCPred("$thread_zombie_self", 1, p_thread_zombie_self, SafePredFlag);
Yap_InitCPred("$thread_join", 1, p_thread_join, 0);
Yap_InitCPred("$thread_destroy", 1, p_thread_destroy, 0);
Yap_InitCPred("thread_yield", 0, p_thread_yield, 0);
/** @pred thread_yield
Voluntarily relinquish the processor.
*/
Yap_InitCPred("$detach_thread", 1, p_thread_detach, 0);
Yap_InitCPred("$thread_detached", 1, p_thread_detached, 0);
Yap_InitCPred("$thread_detached", 2, p_thread_detached2, 0);
Yap_InitCPred("$thread_exit", 0, p_thread_exit, 0);
Yap_InitCPred("thread_setconcurrency", 2, p_thread_set_concurrency, 0);
/** @pred thread_setconcurrency(+ _Old_, - _New_)
Determine the concurrency of the process, which is defined as the
maximum number of concurrently active threads. `Active` here means
they are using CPU time. This option is provided if the
thread-implementation provides
`pthread_setconcurrency()`. Solaris is a typical example of this
family. On other systems this predicate unifies _Old_ to 0 (zero)
and succeeds silently.
*/
Yap_InitCPred("$valid_thread", 1, p_valid_thread, 0);
Yap_InitCPred("mutex_create", 1, p_new_mutex, SafePredFlag);
Yap_InitCPred("mutex_destroy", 1, p_destroy_mutex, SafePredFlag);
Yap_InitCPred("mutex_lock", 1, p_lock_mutex, SafePredFlag);
Yap_InitCPred("mutex_trylock", 1, p_trylock_mutex, SafePredFlag);
Yap_InitCPred("mutex_unlock", 1, p_unlock_mutex, SafePredFlag);
Yap_InitCPred("with_mutex", 2, p_with_mutex, MetaPredFlag);
Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0);
Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0);
Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag);
Yap_InitCPred("$cond_create", 1, p_cond_create, SafePredFlag);
Yap_InitCPred("$cond_destroy", 1, p_cond_destroy, SafePredFlag);
Yap_InitCPred("$cond_signal", 1, p_cond_signal, SafePredFlag);
Yap_InitCPred("$cond_broadcast", 1, p_cond_broadcast, SafePredFlag);
Yap_InitCPred("$cond_wait", 2, p_cond_wait, SafePredFlag);
Yap_InitCPred("$message_queue_create", 1, p_mbox_create, SafePredFlag);
Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag);
Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag);
Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag);
Yap_InitCPred("$message_queue_size", 2, p_mbox_size, SafePredFlag);
Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag);
Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag);
Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag);
Yap_InitCPred("$nof_threads_created", 1, p_nof_threads_created, SafePredFlag);
Yap_InitCPred("$thread_sleep", 4, p_thread_sleep, SafePredFlag);
Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag);
Yap_InitCPred("$thread_self_lock", 1, p_thread_self_lock, SafePredFlag);
Yap_InitCPred("$thread_run_at_exit", 2, p_thread_atexit, SafePredFlag);
Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag);
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag);
Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag);
#endif
}
#else
int
Yap_NOfThreads(void) {
// GLOBAL_ThreadHandlesLock is held
#ifdef YAPOR
return 2;
#else
return 1;
#endif
}
static Int
p_no_threads(void)
{ /* '$thread_signal'(+P) */
return TRUE;
}
static Int
p_nof_threads(void)
{ /* '$nof_threads'(+P) */
return Yap_unify(ARG1,MkIntTerm(1));
}
static Int
p_max_threads(void)
{ /* '$nof_threads'(+P) */
return Yap_unify(ARG1,MkIntTerm(1));
}
static Int
p_nof_threads_created(void)
{ /* '$nof_threads'(+P) */
return Yap_unify(ARG1,MkIntTerm(1));
}
static Int
p_thread_runtime(void)
{ /* '$thread_runtime'(+P) */
return Yap_unify(ARG1,MkIntTerm(0));
}
static Int
p_thread_self(void)
{ /* '$thread_runtime'(+P) */
return Yap_unify(ARG1,MkIntTerm(0));
}
static Int
p_thread_stacks(void)
{ /* '$thread_runtime'(+P) */
return FALSE;
}
static Int
p_thread_unlock(void)
{ /* '$thread_runtime'(+P) */
return TRUE;
}
static Int
p_max_workers(void)
{ /* '$max_workers'(+P) */
return Yap_unify(ARG1,MkIntTerm(1));
}
static Int
p_new_mutex(void)
{ /* '$max_workers'(+P) */
static int mutexes = 1;
return Yap_unify(ARG1, MkIntegerTerm(mutexes++) );
}
static Int
p_with_mutex( USES_REGS1 )
{
Int mut;
Term t1 = Deref(ARG1), excep;
Int rc = FALSE;
Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
PredEntry *pe;
Term tm = CurrentModule;
Term tg = Deref(ARG2);
if (IsVarTerm(t1)) {
p_new_mutex( PASS_REGS1 );
}
t1 = Deref(ARG1);
mut = IntOfTerm(t1);
tg = Yap_StripModule(tg, &tm);
if (IsVarTerm(tg)) {
Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2");
goto end;
} else if (IsApplTerm(tg)) {
register Functor f = FunctorOfTerm(tg);
register CELL *pt;
size_t i, arity;
f = FunctorOfTerm(tg);
if (IsExtensionFunctor(f)) {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
arity = ArityOfFunctor(f);
if (arity > MaxTemps) {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
pe = RepPredProp(PredPropByFunc(f, tm));
pt = RepAppl(tg)+1;
for (i= 0; i < arity; i++ )
XREGS[i+1] = pt[i];
} else if (IsAtomTerm(tg)) {
pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm));
} else if (IsPairTerm(tg)) {
register CELL *pt;
Functor f;
f = FunctorDot;
pe = RepPredProp(PredPropByFunc(f, tm));
pt = RepPair(tg);
XREGS[1] = pt[0];
XREGS[2] = pt[1];
} else {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
if (
pe->OpcodeOfPred != FAIL_OPCODE &&
Yap_execute_pred(pe, NULL, false PASS_REGS) ) {
rc = TRUE;
}
end:
ARG1 = MkIntegerTerm(mut);
excep = Yap_GetException();
if (creeping) {
Yap_signal( YAP_CREEP_SIGNAL );
} else if ( excep != 0) {
return Yap_JumpToEnv(excep);
}
return rc;
}
void
Yap_InitFirstWorkerThreadHandle(void)
{
}
void Yap_InitThreadPreds(void)
{
Yap_InitCPred("with_mutex", 2, p_with_mutex, MetaPredFlag);
Yap_InitCPred("mutex_create", 1, p_new_mutex, SafePredFlag);
Yap_InitCPred("$max_workers", 1, p_max_workers, 0);
Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag);
Yap_InitCPred("$no_threads", 0, p_no_threads, SafePredFlag);
Yap_InitCPred("$max_threads", 1, p_max_threads, SafePredFlag);
Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag);
Yap_InitCPred("$nof_threads_created", 1, p_nof_threads_created, SafePredFlag);
Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag);
Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag);
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag);
Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag);
#endif
}
#endif /* THREADS */
/**
@}
*/