1888 lines
49 KiB
C
1888 lines
49 KiB
C
/*************************************************************************
|
|
* *
|
|
* YAP Prolog *
|
|
* *
|
|
* Yap Prolog was developed at NCCUP - Universidade do Porto *
|
|
* *
|
|
* Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 *
|
|
* *
|
|
**************************************************************************
|
|
* *
|
|
* File: stdpreds.c *
|
|
* Last rev: *
|
|
* mods: *
|
|
* comments: threads *
|
|
* *
|
|
*************************************************************************/
|
|
#ifdef SCCS
|
|
static char SccsId[] = "%W% %G%";
|
|
#endif
|
|
|
|
/**
|
|
@ingroup Threads
|
|
@{
|
|
*/
|
|
|
|
#include "Yap.h"
|
|
#include "Yatom.h"
|
|
#include "YapHeap.h"
|
|
#include "eval.h"
|
|
#include "yapio.h"
|
|
#include "pl-shared.h"
|
|
#include <stdio.h>
|
|
#include <SWI-Prolog.h>
|
|
#if HAVE_STRING_H
|
|
#include <string.h>
|
|
#endif
|
|
#ifdef TABLING
|
|
#include "tab.macros.h"
|
|
#endif /* TABLING */
|
|
|
|
|
|
PL_blob_t PL_Message_Queue = {
|
|
PL_BLOB_MAGIC,
|
|
PL_BLOB_UNIQUE | PL_BLOB_NOCOPY,
|
|
"message_queue",
|
|
0, // release
|
|
0, // compare
|
|
0, // write
|
|
0 // acquire
|
|
};
|
|
|
|
|
|
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
|
|
|
|
int debug_locks = FALSE, debug_pe_locks = FALSE;
|
|
static Int p_debug_locks( USES_REGS1 ) { debugf=stdout; debug_pe_locks = 1; return TRUE; }
|
|
|
|
static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; debug_pe_locks = 0; return TRUE; }
|
|
|
|
#endif
|
|
|
|
#if THREADS
|
|
|
|
#include "threads.h"
|
|
|
|
|
|
/*
|
|
* This file includes the definition of threads in Yap. Threads
|
|
* are supposed to be compatible with the SWI-Prolog thread package.
|
|
*
|
|
*/
|
|
|
|
static void
|
|
set_system_thread_id(int wid, PL_thread_info_t *info)
|
|
{
|
|
if (!info)
|
|
info = (PL_thread_info_t *)malloc(sizeof(PL_thread_info_t));
|
|
info = SWI_thread_info(wid, info);
|
|
info->tid = pthread_self();
|
|
info->has_tid = TRUE;
|
|
#ifdef HAVE_GETTID_SYSCALL
|
|
info->pid = syscall(__NR_gettid);
|
|
#else
|
|
#ifdef HAVE_GETTID_MACRO
|
|
info->pid = gettid();
|
|
#else
|
|
#ifdef __WINDOWS__
|
|
info->w32id = GetCurrentThreadId();
|
|
#endif
|
|
#endif
|
|
#endif
|
|
}
|
|
|
|
int
|
|
Yap_ThreadID( void )
|
|
{
|
|
int new_worker_id = 0;
|
|
pthread_t self = pthread_self();
|
|
while(new_worker_id < MAX_THREADS &&
|
|
Yap_local[new_worker_id] &&
|
|
(REMOTE_ThreadHandle(new_worker_id).in_use == TRUE ||
|
|
REMOTE_ThreadHandle(new_worker_id).zombie == TRUE) ) {
|
|
if (pthread_equal(self , REMOTE_ThreadHandle(new_worker_id).pthread_handle) ) {
|
|
return new_worker_id;
|
|
}
|
|
new_worker_id++;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
int
|
|
Yap_NOfThreads(void) {
|
|
// GLOBAL_ThreadHandlesLock is held
|
|
return GLOBAL_NOfThreads;
|
|
}
|
|
|
|
static int
|
|
allocate_new_tid(void)
|
|
{
|
|
int new_worker_id = 0;
|
|
LOCK(GLOBAL_ThreadHandlesLock);
|
|
while(new_worker_id < MAX_THREADS &&
|
|
Yap_local[new_worker_id] &&
|
|
(REMOTE_ThreadHandle(new_worker_id).in_use == TRUE ||
|
|
REMOTE_ThreadHandle(new_worker_id).zombie == TRUE) )
|
|
new_worker_id++;
|
|
if (new_worker_id >= MAX_THREADS) {
|
|
new_worker_id = -1;
|
|
} else if (!Yap_local[new_worker_id]) {
|
|
if (!Yap_InitThread(new_worker_id)) {
|
|
UNLOCK(GLOBAL_ThreadHandlesLock);
|
|
return -1;
|
|
}
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock));
|
|
REMOTE_ThreadHandle(new_worker_id).in_use = TRUE;
|
|
} else if (new_worker_id < MAX_THREADS) {
|
|
// reuse existing thread
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock));
|
|
REMOTE_ThreadHandle(new_worker_id).in_use = TRUE;
|
|
} else {
|
|
new_worker_id = -1;
|
|
}
|
|
UNLOCK(GLOBAL_ThreadHandlesLock);
|
|
return new_worker_id;
|
|
}
|
|
|
|
|
|
static bool
|
|
mboxCreate( Term namet, mbox_t *mboxp USES_REGS )
|
|
{
|
|
pthread_mutex_t *mutexp;
|
|
pthread_cond_t *condp;
|
|
struct idb_queue *msgsp;
|
|
|
|
memset(mboxp, 0, sizeof(mbox_t));
|
|
condp = & mboxp->cond;
|
|
pthread_cond_init(condp, NULL);
|
|
mutexp = & mboxp->mutex;
|
|
pthread_mutex_init(mutexp, NULL);
|
|
msgsp = & mboxp->msgs;
|
|
mboxp->nmsgs = 0;
|
|
mboxp->nclients = 0;
|
|
Yap_init_tqueue(msgsp);
|
|
// match at the end, when everything is built.
|
|
mboxp->name = namet;
|
|
mboxp->open = true;
|
|
return true;
|
|
}
|
|
|
|
static bool
|
|
mboxDestroy( mbox_t *mboxp USES_REGS )
|
|
{
|
|
pthread_mutex_t *mutexp = &mboxp->mutex;
|
|
pthread_cond_t *condp = &mboxp->cond;
|
|
struct idb_queue *msgsp = &mboxp->msgs;
|
|
mboxp->open = false;
|
|
if (mboxp->nclients == 0 ) {
|
|
pthread_cond_destroy(condp);
|
|
pthread_mutex_destroy(mutexp);
|
|
Yap_destroy_tqueue(msgsp PASS_REGS);
|
|
// at this point, there is nothing left to unlock!
|
|
return true;
|
|
} else {
|
|
/* we have clients in the mailbox, try to wake them up one by one */
|
|
pthread_cond_broadcast(condp);
|
|
pthread_mutex_unlock(mutexp);
|
|
return true;
|
|
}
|
|
}
|
|
|
|
static bool
|
|
mboxSend( mbox_t *mboxp, Term t USES_REGS )
|
|
{
|
|
pthread_mutex_t *mutexp = &mboxp->mutex;
|
|
pthread_cond_t *condp = &mboxp->cond;
|
|
struct idb_queue *msgsp = &mboxp->msgs;
|
|
|
|
if (!mboxp->open) {
|
|
// oops, dead mailbox
|
|
return false;
|
|
}
|
|
Yap_enqueue_tqueue(msgsp, t PASS_REGS);
|
|
// printf("+ (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs);
|
|
mboxp->nmsgs++;
|
|
pthread_cond_broadcast(condp);
|
|
pthread_mutex_unlock(mutexp);
|
|
return true;
|
|
}
|
|
|
|
static bool
|
|
mboxReceive( mbox_t *mboxp, Term t USES_REGS )
|
|
{
|
|
pthread_mutex_t *mutexp = &mboxp->mutex;
|
|
pthread_cond_t *condp = &mboxp->cond;
|
|
struct idb_queue *msgsp = &mboxp->msgs;
|
|
bool rc;
|
|
|
|
if (!mboxp->open){
|
|
return false; // don't try to read if someone else already closed down...
|
|
}
|
|
mboxp->nclients++;
|
|
do {
|
|
rc = mboxp->nmsgs && Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS);
|
|
if (rc) {
|
|
mboxp->nclients--;
|
|
mboxp->nmsgs--;
|
|
//printf("- (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs);
|
|
// Yap_do_low_level_trace=1;
|
|
pthread_mutex_unlock(mutexp);
|
|
return true;
|
|
} else if (!mboxp->open) {
|
|
//printf("o (%d)\n", worker_id);
|
|
mboxp->nclients--;
|
|
if (!mboxp->nclients) {// release
|
|
pthread_cond_destroy(condp);
|
|
pthread_mutex_destroy(mutexp);
|
|
Yap_destroy_tqueue(msgsp PASS_REGS);
|
|
// at this point, there is nothing left to unlock!
|
|
} else {
|
|
pthread_cond_broadcast(condp);
|
|
pthread_mutex_unlock(mutexp);
|
|
}
|
|
return false;
|
|
} else {
|
|
pthread_cond_wait(condp, mutexp);
|
|
}
|
|
} while (!rc);
|
|
return rc;
|
|
}
|
|
|
|
static bool
|
|
mboxPeek( mbox_t *mboxp, Term t USES_REGS )
|
|
{
|
|
pthread_mutex_t *mutexp = &mboxp->mutex;
|
|
struct idb_queue *msgsp = &mboxp->msgs;
|
|
bool rc = Yap_dequeue_tqueue(msgsp, t, false, false PASS_REGS);
|
|
pthread_mutex_unlock(mutexp);
|
|
return rc;
|
|
}
|
|
|
|
static int
|
|
store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit)
|
|
{
|
|
CACHE_REGS
|
|
UInt pm; /* memory to be requested */
|
|
Term tmod;
|
|
|
|
if (tsize < MinTrailSpace)
|
|
tsize = MinTrailSpace;
|
|
if (ssize < MinStackSpace)
|
|
ssize = MinStackSpace;
|
|
REMOTE_ThreadHandle(new_worker_id).ssize = ssize;
|
|
REMOTE_ThreadHandle(new_worker_id).tsize = tsize;
|
|
REMOTE_ThreadHandle(new_worker_id).sysize = sysize;
|
|
|
|
if ((REGSTORE *)pthread_getspecific(Yap_yaamregs_key)) {
|
|
REMOTE_c_input_stream(new_worker_id) = LOCAL_c_input_stream;
|
|
REMOTE_c_output_stream(new_worker_id) = LOCAL_c_output_stream;
|
|
REMOTE_c_error_stream(new_worker_id) = LOCAL_c_error_stream;
|
|
} else {
|
|
// thread is created by a thread that has never run Prolog
|
|
REMOTE_c_input_stream(new_worker_id) = REMOTE_c_input_stream(0);
|
|
REMOTE_c_output_stream(new_worker_id) = REMOTE_c_output_stream(0);
|
|
REMOTE_c_error_stream(new_worker_id) = REMOTE_c_error_stream(0);
|
|
}
|
|
pm = (ssize + tsize)*K1;
|
|
if (!(REMOTE_ThreadHandle(new_worker_id).stack_address = malloc(pm))) {
|
|
return FALSE;
|
|
}
|
|
REMOTE_ThreadHandle(new_worker_id).tgoal =
|
|
Yap_StoreTermInDB(Deref(tgoal), 7);
|
|
|
|
if (CurrentModule) {
|
|
REMOTE_ThreadHandle(new_worker_id).cmod =
|
|
CurrentModule;
|
|
} else {
|
|
REMOTE_ThreadHandle(new_worker_id).cmod = USER_MODULE;
|
|
}
|
|
tdetach = Deref(tdetach);
|
|
if (IsVarTerm(tdetach)){
|
|
REMOTE_ThreadHandle(new_worker_id).tdetach =
|
|
MkAtomTerm(AtomFalse);
|
|
} else {
|
|
REMOTE_ThreadHandle(new_worker_id).tdetach =
|
|
tdetach;
|
|
}
|
|
tmod = CurrentModule;
|
|
texit = Yap_StripModule(Deref(texit), &tmod);
|
|
if (IsAtomTerm(tmod)) {
|
|
REMOTE_ThreadHandle(new_worker_id).texit_mod = tmod;
|
|
} else {
|
|
Yap_Error(TYPE_ERROR_ATOM,tmod,"module in exit call should be an atom");
|
|
}
|
|
REMOTE_ThreadHandle(new_worker_id).texit =
|
|
Yap_StoreTermInDB(texit,7);
|
|
REMOTE_ThreadHandle(new_worker_id).local_preds =
|
|
NULL;
|
|
REMOTE_ThreadHandle(new_worker_id).start_of_timesp =
|
|
NULL;
|
|
REMOTE_ThreadHandle(new_worker_id).last_timep =
|
|
NULL;
|
|
REMOTE_ScratchPad(new_worker_id).ptr =
|
|
NULL;
|
|
// reset arena info
|
|
REMOTE_GlobalArena(new_worker_id) =0;
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
static void
|
|
kill_thread_engine (int wid, int always_die)
|
|
{
|
|
Prop p0 = AbsPredProp(REMOTE_ThreadHandle(wid).local_preds);
|
|
GlobalEntry *gl = REMOTE_GlobalVariables(wid);
|
|
|
|
REMOTE_ThreadHandle(wid).local_preds = NIL;
|
|
REMOTE_GlobalVariables(wid) = NULL;
|
|
/* kill all thread local preds */
|
|
while(p0) {
|
|
PredEntry *ap = RepPredProp(p0);
|
|
p0 = ap->NextOfPE;
|
|
Yap_Abolish(ap);
|
|
Yap_FreeCodeSpace((char *)ap);
|
|
}
|
|
while (gl) {
|
|
gl->global = TermFoundVar;
|
|
gl = gl->NextGE;
|
|
}
|
|
Yap_KillStacks(wid);
|
|
REMOTE_Signals(wid) = 0L;
|
|
// must be done before relessing the memory used to store
|
|
// thread local time.
|
|
if (!always_die) {
|
|
/* called by thread itself */
|
|
GLOBAL_ThreadsTotalTime += Yap_cputime();
|
|
}
|
|
if (REMOTE_ScratchPad(wid).ptr)
|
|
free(REMOTE_ScratchPad(wid).ptr);
|
|
// if (REMOTE_TmpPred(wid).ptr)
|
|
// free(REMOTE_TmpPred(wid).ptr);
|
|
REMOTE_PL_local_data_p(wid)->reg_cache =
|
|
REMOTE_ThreadHandle(wid).current_yaam_regs = NULL;
|
|
if (REMOTE_ThreadHandle(wid).start_of_timesp)
|
|
free(REMOTE_ThreadHandle(wid).start_of_timesp);
|
|
if (REMOTE_ThreadHandle(wid).last_timep)
|
|
free(REMOTE_ThreadHandle(wid).last_timep);
|
|
if (REMOTE_ThreadHandle(wid).texit) {
|
|
Yap_FreeCodeSpace((ADDR)REMOTE_ThreadHandle(wid).texit);
|
|
}
|
|
/* FreeCodeSpace requires LOCAL requires yaam_regs */
|
|
free(REMOTE_ThreadHandle(wid).default_yaam_regs);
|
|
REMOTE_ThreadHandle(wid).default_yaam_regs = NULL;
|
|
LOCK(GLOBAL_ThreadHandlesLock);
|
|
GLOBAL_NOfThreads--;
|
|
UNLOCK(GLOBAL_ThreadHandlesLock);
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
if (REMOTE_ThreadHandle(wid).tdetach == MkAtomTerm(AtomTrue) ||
|
|
always_die) {
|
|
REMOTE_ThreadHandle(wid).zombie = FALSE;
|
|
REMOTE_ThreadHandle(wid).in_use = FALSE;
|
|
}
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
}
|
|
|
|
static void
|
|
thread_die(int wid, int always_die)
|
|
{
|
|
kill_thread_engine(wid, always_die);
|
|
}
|
|
|
|
static int
|
|
setup_engine(int myworker_id, int init_thread)
|
|
{
|
|
CACHE_REGS
|
|
REGSTORE *standard_regs;
|
|
|
|
set_system_thread_id( myworker_id, NULL );
|
|
standard_regs = (REGSTORE *)calloc(1,sizeof(REGSTORE));
|
|
if (!standard_regs)
|
|
return FALSE;
|
|
regcache = standard_regs;
|
|
/* create the YAAM descriptor */
|
|
REMOTE_ThreadHandle(myworker_id).default_yaam_regs = standard_regs;
|
|
REMOTE_ThreadHandle(myworker_id).current_yaam_regs = standard_regs;
|
|
REMOTE_PL_local_data_p(myworker_id)->reg_cache = standard_regs;
|
|
Yap_InitExStacks(myworker_id, REMOTE_ThreadHandle(myworker_id).tsize, REMOTE_ThreadHandle(myworker_id).ssize);
|
|
REMOTE_SourceModule(myworker_id) = CurrentModule = REMOTE_ThreadHandle(myworker_id).cmod;
|
|
// create a mbox
|
|
mboxCreate( MkIntTerm(myworker_id), &REMOTE_ThreadHandle(myworker_id).mbox_handle PASS_REGS );
|
|
Yap_InitTime( myworker_id );
|
|
Yap_InitYaamRegs( myworker_id );
|
|
REFRESH_CACHE_REGS
|
|
Yap_ReleasePreAllocCodeSpace(Yap_PreAllocCodeSpace());
|
|
/* I exist */
|
|
GLOBAL_NOfThreadsCreated++;
|
|
GLOBAL_NOfThreads++;
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(myworker_id).tlock));
|
|
#ifdef TABLING
|
|
new_dependency_frame(REMOTE_top_dep_fr(myworker_id), FALSE, NULL, NULL, B, NULL, FALSE, NULL); /* same as in Yap_init_root_frames() */
|
|
#endif /* TABLING */
|
|
return TRUE;
|
|
}
|
|
|
|
static void
|
|
start_thread(int myworker_id)
|
|
{
|
|
CACHE_REGS
|
|
pthread_setspecific(Yap_yaamregs_key, (void *)REMOTE_ThreadHandle(myworker_id).default_yaam_regs);
|
|
REFRESH_CACHE_REGS;
|
|
worker_id = myworker_id;
|
|
LOCAL = REMOTE(myworker_id);
|
|
}
|
|
|
|
static void *
|
|
thread_run(void *widp)
|
|
{
|
|
CACHE_REGS
|
|
Term tgoal, t;
|
|
Term tgs[2];
|
|
int myworker_id = *((int *)widp);
|
|
#ifdef OUTPUT_THREADS_TABLING
|
|
char thread_name[25];
|
|
char filename[YAP_FILENAME_MAX];
|
|
|
|
sprintf(thread_name, "/thread_output_%d", myworker_id);
|
|
strcpy(filename, YAP_BINDIR);
|
|
strncat(filename, thread_name, 25);
|
|
REMOTE_thread_output(myworker_id) = fopen(filename, "w");
|
|
#endif /* OUTPUT_THREADS_TABLING */
|
|
start_thread(myworker_id);
|
|
REFRESH_CACHE_REGS;
|
|
do {
|
|
t = tgs[0] = Yap_PopTermFromDB(LOCAL_ThreadHandle.tgoal);
|
|
if (t == 0) {
|
|
if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) {
|
|
LOCAL_Error_TYPE = YAP_NO_ERROR;
|
|
if (!Yap_growglobal(NULL)) {
|
|
Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage);
|
|
thread_die(worker_id, FALSE);
|
|
return NULL;
|
|
}
|
|
} else {
|
|
LOCAL_Error_TYPE = YAP_NO_ERROR;
|
|
if (!Yap_growstack(LOCAL_ThreadHandle.tgoal->NOfCells*CellSize)) {
|
|
Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage);
|
|
thread_die(worker_id, FALSE);
|
|
return NULL;
|
|
}
|
|
}
|
|
}
|
|
} while (t == 0);
|
|
REMOTE_ThreadHandle(myworker_id).tgoal = NULL;
|
|
tgs[1] = LOCAL_ThreadHandle.tdetach;
|
|
tgoal = Yap_MkApplTerm(FunctorThreadRun, 2, tgs);
|
|
Yap_RunTopGoal(tgoal);
|
|
#ifdef TABLING
|
|
{
|
|
tab_ent_ptr tab_ent;
|
|
|
|
tab_ent = GLOBAL_root_tab_ent;
|
|
while (tab_ent) {
|
|
abolish_table(tab_ent);
|
|
tab_ent = TabEnt_next(tab_ent);
|
|
}
|
|
FREE_DEPENDENCY_FRAME(REMOTE_top_dep_fr(worker_id));
|
|
REMOTE_top_dep_fr(worker_id) = NULL;
|
|
#ifdef USE_PAGES_MALLOC
|
|
DETACH_PAGES(_pages_void);
|
|
#endif /* USE_PAGES_MALLOC */
|
|
DETACH_PAGES(_pages_tab_ent);
|
|
#if defined(THREADS_FULL_SHARING) || defined(THREADS_CONSUMER_SHARING)
|
|
DETACH_PAGES(_pages_sg_ent);
|
|
#endif /* THREADS_FULL_SHARING || THREADS_CONSUMER_SHARING */
|
|
DETACH_PAGES(_pages_sg_fr);
|
|
DETACH_PAGES(_pages_dep_fr);
|
|
DETACH_PAGES(_pages_sg_node);
|
|
DETACH_PAGES(_pages_sg_hash);
|
|
DETACH_PAGES(_pages_ans_node);
|
|
DETACH_PAGES(_pages_ans_hash);
|
|
#if defined(THREADS_FULL_SHARING)
|
|
DETACH_PAGES(_pages_ans_ref_node);
|
|
#endif /* THREADS_FULL_SHARING */
|
|
DETACH_PAGES(_pages_gt_node);
|
|
DETACH_PAGES(_pages_gt_hash);
|
|
#ifdef OUTPUT_THREADS_TABLING
|
|
fclose(LOCAL_thread_output);
|
|
#endif /* OUTPUT_THREADS_TABLING */
|
|
|
|
}
|
|
#endif /* TABLING */
|
|
thread_die(worker_id, FALSE);
|
|
return NULL;
|
|
}
|
|
|
|
static Int
|
|
p_thread_new_tid( USES_REGS1 )
|
|
{
|
|
int new_worker = allocate_new_tid();
|
|
if (new_worker == -1) {
|
|
Yap_Error(RESOURCE_ERROR_MAX_THREADS, MkIntegerTerm(MAX_THREADS), "");
|
|
return FALSE;
|
|
}
|
|
return Yap_unify(MkIntegerTerm(new_worker), ARG1);
|
|
}
|
|
|
|
static int
|
|
init_thread_engine(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit)
|
|
{
|
|
return store_specs(new_worker_id, ssize, tsize, sysize, tgoal, tdetach, texit);
|
|
}
|
|
|
|
static Int
|
|
p_create_thread( USES_REGS1 )
|
|
{
|
|
UInt ssize;
|
|
UInt tsize;
|
|
UInt sysize;
|
|
Term x2 = Deref(ARG2);
|
|
Term x3 = Deref(ARG3);
|
|
Term x4 = Deref(ARG4);
|
|
int new_worker_id = IntegerOfTerm(Deref(ARG7)),
|
|
owid = worker_id;
|
|
|
|
// fprintf(stderr," %d --> %d\n", worker_id, new_worker_id);
|
|
if (IsBigIntTerm(x2))
|
|
return FALSE;
|
|
if (IsBigIntTerm(x3))
|
|
return FALSE;
|
|
ssize = IntegerOfTerm(x2);
|
|
tsize = IntegerOfTerm(x3);
|
|
sysize = IntegerOfTerm(x4);
|
|
/* UInt systemsize = IntegerOfTerm(Deref(ARG4)); */
|
|
if (new_worker_id == -1) {
|
|
/* YAP ERROR */
|
|
return FALSE;
|
|
}
|
|
/* make sure we can proceed */
|
|
if (!init_thread_engine(new_worker_id, ssize, tsize, sysize, ARG1, ARG5, ARG6))
|
|
return FALSE;
|
|
//REMOTE_ThreadHandle(new_worker_id).pthread_handle = 0L;
|
|
REMOTE_ThreadHandle(new_worker_id).id = new_worker_id;
|
|
REMOTE_ThreadHandle(new_worker_id).ref_count = 1;
|
|
setup_engine(new_worker_id, FALSE);
|
|
if ((REMOTE_ThreadHandle(new_worker_id).ret = pthread_create(&REMOTE_ThreadHandle(new_worker_id).pthread_handle, NULL, thread_run, (void *)(&(REMOTE_ThreadHandle(new_worker_id).id)))) == 0) {
|
|
pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(owid).current_yaam_regs);
|
|
/* wait until the client is initialised */
|
|
return TRUE;
|
|
}
|
|
pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(owid).current_yaam_regs);
|
|
return FALSE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_sleep( USES_REGS1 )
|
|
{
|
|
UInt time = IntegerOfTerm(Deref(ARG1));
|
|
#if HAVE_NANOSLEEP
|
|
UInt ntime = IntegerOfTerm(Deref(ARG2));
|
|
struct timespec req, oreq ;
|
|
req.tv_sec = time;
|
|
req.tv_nsec = ntime;
|
|
if (nanosleep(&req, &oreq)) {
|
|
#if HAVE_STRERROR
|
|
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "%s in thread_sleep/1", strerror(errno));
|
|
#else
|
|
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "error %d in thread_sleep/1", errno);
|
|
#endif
|
|
return FALSE;
|
|
}
|
|
return Yap_unify(ARG3,MkIntegerTerm(oreq.tv_sec)) &&
|
|
Yap_unify(ARG4,MkIntegerTerm(oreq.tv_nsec));
|
|
#elif HAVE_SLEEP
|
|
UInt rtime;
|
|
if ((rtime = sleep(time)) < 0) {
|
|
#if HAVE_STRERROR
|
|
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "%s in thread_sleep/1", strerror(errno));
|
|
#else
|
|
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "error %d in thread_sleep/1", errno);
|
|
#endif
|
|
}
|
|
return Yap_unify(ARG3,MkIntegerTerm(rtime)) &&
|
|
Yap_unify(ARG4,MkIntTerm(0L));
|
|
#else
|
|
Yap_Error(OPERATING_SYSTEM_ERROR, ARG1, "no support for thread_sleep/1 in this YAP configuration");
|
|
#endif
|
|
}
|
|
|
|
static Int
|
|
p_thread_self( USES_REGS1 )
|
|
{
|
|
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
|
|
return Yap_unify(MkIntegerTerm(-1), ARG1);
|
|
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
|
|
}
|
|
|
|
|
|
static Int
|
|
p_thread_zombie_self( USES_REGS1 )
|
|
{
|
|
/* make sure the lock is available */
|
|
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
|
|
return Yap_unify(MkIntegerTerm(-1), ARG1);
|
|
if (Yap_has_signal( YAP_ITI_SIGNAL )) {
|
|
return FALSE;
|
|
}
|
|
// fprintf(stderr," -- %d\n", worker_id);
|
|
LOCAL_ThreadHandle.in_use = FALSE;
|
|
LOCAL_ThreadHandle.zombie = TRUE;
|
|
MUTEX_UNLOCK(&(LOCAL_ThreadHandle.tlock));
|
|
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
|
|
}
|
|
|
|
static Int
|
|
p_thread_status_lock( USES_REGS1 )
|
|
{
|
|
/* make sure the lock is available */
|
|
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
|
|
return FALSE;
|
|
MUTEX_LOCK(&(LOCAL_ThreadHandle.tlock_status));
|
|
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
|
|
}
|
|
|
|
static Int
|
|
p_thread_status_unlock( USES_REGS1 )
|
|
{
|
|
/* make sure the lock is available */
|
|
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
|
|
return FALSE;
|
|
MUTEX_UNLOCK(&(LOCAL_ThreadHandle.tlock_status));
|
|
return Yap_unify(MkIntegerTerm(worker_id), ARG1);
|
|
}
|
|
|
|
Int
|
|
Yap_thread_self(void)
|
|
{
|
|
CACHE_REGS
|
|
if (pthread_getspecific(Yap_yaamregs_key) == NULL)
|
|
return -1;
|
|
return worker_id;
|
|
}
|
|
|
|
CELL
|
|
Yap_thread_create_engine(YAP_thread_attr *ops)
|
|
{
|
|
YAP_thread_attr opsv;
|
|
int new_id = allocate_new_tid();
|
|
Term t = TermNil;
|
|
|
|
/*
|
|
ok, this creates a problem, because we are initializing an engine from
|
|
some "empty" thread.
|
|
We need first to fool the thread into believing it is the main thread
|
|
*/
|
|
if (new_id == -1) {
|
|
/* YAP ERROR */
|
|
return -1;
|
|
}
|
|
if (ops == NULL) {
|
|
ops = &opsv;
|
|
ops->tsize = DefHeapSpace;
|
|
ops->ssize = DefStackSpace;
|
|
ops->sysize = 0;
|
|
ops->egoal = t;
|
|
}
|
|
if (!pthread_equal(pthread_self() , GLOBAL_master_thread) ) {
|
|
/* we are worker_id 0 for now, lock master thread so that no one messes with us */
|
|
pthread_setspecific(Yap_yaamregs_key, (const void *)&Yap_standard_regs);
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(0).tlock));
|
|
}
|
|
if (!init_thread_engine(new_id, ops->ssize, ops->tsize, ops->sysize, t, t, (ops->egoal)))
|
|
return -1;
|
|
//REMOTE_ThreadHandle(new_id).pthread_handle = 0L;
|
|
REMOTE_ThreadHandle(new_id).id = new_id;
|
|
REMOTE_ThreadHandle(new_id).ref_count = 0;
|
|
if (!setup_engine(new_id, FALSE))
|
|
return -1;
|
|
if (!pthread_equal(pthread_self(), GLOBAL_master_thread)) {
|
|
pthread_setspecific(Yap_yaamregs_key, NULL);
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(0).tlock));
|
|
}
|
|
return new_id;
|
|
}
|
|
|
|
Int
|
|
Yap_thread_attach_engine(int wid)
|
|
{
|
|
/*
|
|
already locked
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
*/
|
|
if (REMOTE_ThreadHandle(wid).ref_count ) {
|
|
REMOTE_ThreadHandle(wid).ref_count++;
|
|
REMOTE_ThreadHandle(wid).pthread_handle = pthread_self();
|
|
set_system_thread_id(wid, SWI_thread_info(wid, NULL));
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
return TRUE;
|
|
}
|
|
REMOTE_ThreadHandle(wid).pthread_handle = pthread_self();
|
|
set_system_thread_id(wid, SWI_thread_info(wid, NULL));
|
|
REMOTE_ThreadHandle(wid).ref_count++;
|
|
pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(wid).current_yaam_regs);
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
return TRUE;
|
|
}
|
|
|
|
Int
|
|
Yap_thread_detach_engine(int wid)
|
|
{
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
//REMOTE_ThreadHandle(wid).pthread_handle = 0;
|
|
REMOTE_ThreadHandle(wid).ref_count--;
|
|
pthread_setspecific(Yap_yaamregs_key, NULL);
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
return TRUE;
|
|
}
|
|
|
|
Int
|
|
Yap_thread_destroy_engine(int wid)
|
|
{
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
if (REMOTE_ThreadHandle(wid).ref_count == 0) {
|
|
kill_thread_engine(wid, TRUE);
|
|
return TRUE;
|
|
} else {
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
return FALSE;
|
|
}
|
|
}
|
|
|
|
|
|
static Int
|
|
p_thread_join( USES_REGS1 )
|
|
{
|
|
Int tid = IntegerOfTerm(Deref(ARG1));
|
|
pthread_t thread;
|
|
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
if (!(REMOTE_ThreadHandle(tid).in_use ||
|
|
REMOTE_ThreadHandle(tid).zombie)) {
|
|
// he's dead, jim
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
return FALSE;
|
|
}
|
|
if (!REMOTE_ThreadHandle(tid).tdetach == MkAtomTerm(AtomTrue)) {
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
return FALSE;
|
|
}
|
|
thread = REMOTE_ThreadHandle(tid).pthread_handle;
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
/* make sure this lock is accessible */
|
|
if (pthread_join(thread, NULL) < 0) {
|
|
/* ERROR */
|
|
return FALSE;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_destroy( USES_REGS1 )
|
|
{
|
|
Int tid = IntegerOfTerm(Deref(ARG1));
|
|
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
REMOTE_ThreadHandle(tid).zombie = FALSE;
|
|
REMOTE_ThreadHandle(tid).in_use = FALSE;
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_detach( USES_REGS1 )
|
|
{
|
|
Int tid = IntegerOfTerm(Deref(ARG1));
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
if (pthread_detach(REMOTE_ThreadHandle(tid).pthread_handle) < 0) {
|
|
/* ERROR */
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
return FALSE;
|
|
}
|
|
REMOTE_ThreadHandle(tid).tdetach =
|
|
MkAtomTerm(AtomTrue);
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_detached( USES_REGS1 )
|
|
{
|
|
if (LOCAL_ThreadHandle.tdetach)
|
|
return Yap_unify(ARG1,LOCAL_ThreadHandle.tdetach);
|
|
else
|
|
return FALSE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_detached2( USES_REGS1 )
|
|
{
|
|
Int tid = IntegerOfTerm(Deref(ARG1));
|
|
if (REMOTE_ThreadHandle(tid).tdetach)
|
|
return Yap_unify(ARG2,REMOTE_ThreadHandle(tid).tdetach);
|
|
else
|
|
return FALSE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_exit( USES_REGS1 )
|
|
{
|
|
thread_die(worker_id, FALSE);
|
|
pthread_exit(NULL);
|
|
/* done, just make gcc happy */
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_set_concurrency( USES_REGS1 )
|
|
{
|
|
#if HAVE_PTHREAD_GETCONCURRENCY
|
|
int newc;
|
|
int cur;
|
|
Term tnew = Deref(ARG2);
|
|
if (IsVarTerm(tnew)) {
|
|
newc = 0;
|
|
} else if (IsIntegerTerm(tnew)) {
|
|
newc = IntegerOfTerm(tnew);
|
|
} else {
|
|
Yap_Error(TYPE_ERROR_INTEGER,tnew,"thread_set_concurrency/2");
|
|
return(FALSE);
|
|
}
|
|
cur = MkIntegerTerm(pthread_getconcurrency());
|
|
if (pthread_setconcurrency(newc) != 0) {
|
|
return FALSE;
|
|
}
|
|
return Yap_unify(ARG1, MkIntegerTerm(cur));
|
|
#else
|
|
return FALSE;
|
|
#endif
|
|
}
|
|
|
|
static Int
|
|
p_thread_yield( USES_REGS1 )
|
|
{
|
|
if (sched_yield() != 0) {
|
|
return FALSE;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_valid_thread( USES_REGS1 )
|
|
{
|
|
Int i = IntegerOfTerm(Deref(ARG1));
|
|
return REMOTE_ThreadHandle(i).in_use || REMOTE_ThreadHandle(i).zombie;
|
|
}
|
|
|
|
/* Mutex Support */
|
|
|
|
typedef struct swi_mutex {
|
|
UInt owners;
|
|
Int tid_own;
|
|
MutexEntry *alias;
|
|
pthread_mutex_t m;
|
|
UInt timestamp;
|
|
struct swi_mutex *backbone; // chain of all mutexes
|
|
struct swi_mutex *prev, *next; // chain of locked mutexes
|
|
} SWIMutex;
|
|
|
|
static SWIMutex *NewMutex(void) {
|
|
SWIMutex* mutp;
|
|
pthread_mutexattr_t mat;
|
|
#if defined(HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && !defined(__MINGW32__)
|
|
extern int pthread_mutexattr_setkind_np(pthread_mutexattr_t *attr, int kind);
|
|
#endif
|
|
|
|
LOCK(GLOBAL_MUT_ACCESS);
|
|
mutp = GLOBAL_FreeMutexes;
|
|
while (mutp) {
|
|
if ((Int)(mutp->owners) < 0) {
|
|
// just making sure
|
|
break;
|
|
}
|
|
mutp = mutp->next;
|
|
}
|
|
if (mutp == NULL) {
|
|
mutp = (SWIMutex *)Yap_AllocCodeSpace(sizeof(SWIMutex));
|
|
if (mutp == NULL) {
|
|
UNLOCK(GLOBAL_MUT_ACCESS);
|
|
return NULL;
|
|
} else {
|
|
pthread_mutexattr_init(&mat);
|
|
mutp->timestamp = 0;
|
|
#if defined(HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && !defined(__MINGW32__)
|
|
pthread_mutexattr_setkind_np(&mat, PTHREAD_MUTEX_RECURSIVE_NP);
|
|
#else
|
|
#ifdef HAVE_PTHREAD_MUTEXATTR_SETTYPE
|
|
pthread_mutexattr_settype(&mat, PTHREAD_MUTEX_RECURSIVE);
|
|
#endif
|
|
#endif
|
|
pthread_mutex_init(&mutp->m, &mat);
|
|
}
|
|
mutp->backbone = GLOBAL_mutex_backbone;
|
|
GLOBAL_mutex_backbone = mutp;
|
|
} else {
|
|
// reuse existing mutex
|
|
mutp->timestamp++;
|
|
}
|
|
mutp->owners = 0;
|
|
mutp->tid_own = 0;
|
|
mutp->alias = NIL;
|
|
UNLOCK(GLOBAL_MUT_ACCESS);
|
|
return mutp;
|
|
}
|
|
|
|
#define MutexOfTerm(t) MutexOfTerm__(t PASS_REGS)
|
|
|
|
static SWIMutex *MutexOfTerm__(Term t USES_REGS){
|
|
Term t1 = Deref(t);
|
|
SWIMutex *mut = NULL;
|
|
|
|
if (IsVarTerm(t1)) {
|
|
Yap_Error(INSTANTIATION_ERROR, t1, "mutex operation");
|
|
return NULL;
|
|
} else if (IsApplTerm(t1) && FunctorOfTerm(t1) == FunctorMutex) {
|
|
mut = AddressOfTerm(ArgOfTerm(1,t1));
|
|
if ((Int)(mut->owners) < 0 ||
|
|
IntegerOfTerm(ArgOfTerm(2,t1)) != mut->timestamp) {
|
|
Yap_Error(EXISTENCE_ERROR_MUTEX, t1, "mutex access");
|
|
return NULL;
|
|
}
|
|
} else if (IsAtomTerm(t1)) {
|
|
mut = Yap_GetMutexFromProp(AtomOfTerm(t1));
|
|
if (!mut) {
|
|
mut = NewMutex();
|
|
if ( !Yap_PutAtomMutex( AtomOfTerm(t1), mut ) ) {
|
|
return NULL;
|
|
}
|
|
}
|
|
}
|
|
return mut;
|
|
}
|
|
|
|
static Int
|
|
p_new_mutex( USES_REGS1 ){
|
|
SWIMutex* mutp;
|
|
Term t1;
|
|
if (IsVarTerm((t1 = Deref(ARG1)))) {
|
|
Term ts[2];
|
|
|
|
if (!(mutp = NewMutex()))
|
|
return FALSE;
|
|
ts[0] = MkAddressTerm(mutp);
|
|
ts[1] = MkIntegerTerm(mutp->timestamp);
|
|
if (Yap_unify(ARG1, Yap_MkApplTerm(FunctorMutex, 2, ts) ) ) {
|
|
return TRUE;
|
|
}
|
|
Yap_Error(UNINSTANTIATION_ERROR, t1, "mutex_create on an existing mutex");
|
|
return FALSE;
|
|
} else if(IsAtomTerm(t1)) {
|
|
if (!(mutp = NewMutex()))
|
|
return FALSE;
|
|
return Yap_PutAtomMutex( AtomOfTerm(t1), mutp );
|
|
} else if (IsApplTerm(t1) && FunctorOfTerm(t1) == FunctorMutex) {
|
|
Yap_Error(UNINSTANTIATION_ERROR, t1, "mutex_create on an existing mutex");
|
|
return FALSE;
|
|
}
|
|
return FALSE;
|
|
}
|
|
|
|
/** @pred mutex_destroy(+ _MutexId_)
|
|
|
|
Destroy a mutex. After this call, _MutexId_ becomes invalid and
|
|
further references yield an `existence_error` exception.
|
|
*/
|
|
static Int p_destroy_mutex( USES_REGS1 )
|
|
{
|
|
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
|
|
if (!mut)
|
|
return FALSE;
|
|
if (pthread_mutex_destroy(&mut->m) < 0)
|
|
return FALSE;
|
|
if (mut->alias) {
|
|
mut->alias->Mutex = NULL;
|
|
}
|
|
mut->owners = -1;
|
|
mut->tid_own = -1;
|
|
LOCK(GLOBAL_MUT_ACCESS);
|
|
if (GLOBAL_FreeMutexes)
|
|
mut->prev = GLOBAL_FreeMutexes->prev;
|
|
else
|
|
mut->prev = NULL;
|
|
mut->next = GLOBAL_FreeMutexes;
|
|
GLOBAL_FreeMutexes = mut;
|
|
UNLOCK(GLOBAL_MUT_ACCESS);
|
|
return TRUE;
|
|
}
|
|
|
|
static bool
|
|
LockMutex( SWIMutex *mut USES_REGS)
|
|
{
|
|
#if DEBUG_LOCKS
|
|
MUTEX_LOCK(&mut->m);
|
|
#else
|
|
if (MUTEX_LOCK(&mut->m) < 0)
|
|
return FALSE;
|
|
#endif
|
|
mut->owners++;
|
|
mut->tid_own = worker_id;
|
|
if (LOCAL_Mutexes)
|
|
mut->prev = LOCAL_Mutexes->prev;
|
|
else
|
|
mut->prev = NULL;
|
|
mut->next = LOCAL_Mutexes;
|
|
LOCAL_Mutexes = NULL;
|
|
return true;
|
|
}
|
|
|
|
static bool
|
|
UnLockMutex( SWIMutex *mut USES_REGS)
|
|
{
|
|
#if DEBUG_LOCKS
|
|
MUTEX_UNLOCK(&mut->m);
|
|
#else
|
|
if (MUTEX_UNLOCK(&mut->m) < 0)
|
|
return FALSE;
|
|
#endif
|
|
mut->owners--;
|
|
if (mut->prev) {
|
|
mut->prev->next = mut->next;
|
|
} else {
|
|
LOCAL_Mutexes = mut->next;
|
|
if (mut->next)
|
|
mut->next->prev = NULL;
|
|
}
|
|
if (mut->next)
|
|
mut->next->prev = mut->prev;
|
|
return true;
|
|
}
|
|
|
|
/** @pred mutex_lock(+ _MutexId_)
|
|
|
|
|
|
Lock the mutex. Prolog mutexes are <em>recursive</em> mutexes: they
|
|
can be locked multiple times by the same thread. Only after unlocking
|
|
it as many times as it is locked, the mutex becomes available for
|
|
locking by other threads. If another thread has locked the mutex the
|
|
calling thread is suspended until to mutex is unlocked.
|
|
|
|
If _MutexId_ is an atom, and there is no current mutex with that
|
|
name, the mutex is created automatically using mutex_create/1. This
|
|
implies named mutexes need not be declared explicitly.
|
|
|
|
Please note that locking and unlocking mutexes should be paired
|
|
carefully. Especially make sure to unlock mutexes even if the protected
|
|
code fails or raises an exception. For most common cases use
|
|
with_mutex/2, which provides a safer way for handling Prolog-level
|
|
mutexes.
|
|
|
|
|
|
*/
|
|
static Int
|
|
p_lock_mutex( USES_REGS1 )
|
|
{
|
|
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
|
|
if (!mut || !LockMutex( mut PASS_REGS))
|
|
return FALSE;
|
|
return TRUE;
|
|
}
|
|
|
|
/** @pred mutex_trylock(+ _MutexId_)
|
|
|
|
|
|
As mutex_lock/1, but if the mutex is held by another thread, this
|
|
predicates fails immediately.
|
|
|
|
|
|
*/
|
|
static Int
|
|
p_trylock_mutex( USES_REGS1 )
|
|
{
|
|
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
|
|
if (!mut)
|
|
return FALSE;
|
|
|
|
if (MUTEX_TRYLOCK(&mut->m) == EBUSY)
|
|
return FALSE;
|
|
mut->owners++;
|
|
mut->tid_own = worker_id;
|
|
return TRUE;
|
|
}
|
|
|
|
/** @pred mutex_unlock(+ _MutexId_)
|
|
|
|
|
|
Unlock the mutex. This can only be called if the mutex is held by the
|
|
calling thread. If this is not the case, a `permission_error`
|
|
exception is raised.
|
|
|
|
|
|
*/
|
|
static Int
|
|
p_unlock_mutex( USES_REGS1 )
|
|
{
|
|
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
|
|
if (!mut || !UnLockMutex( mut PASS_REGS))
|
|
return FALSE;
|
|
return TRUE;
|
|
}
|
|
|
|
/** @pred with_mutex(+ _MutexId_, : _Goal_)
|
|
|
|
|
|
Execute _Goal_ while holding _MutexId_. If _Goal_ leaves
|
|
choicepoints, these are destroyed (as in once/1). The mutex is unlocked
|
|
regardless of whether _Goal_ succeeds, fails or raises an exception.
|
|
An exception thrown by _Goal_ is re-thrown after the mutex has been
|
|
successfully unlocked. See also `mutex_create/2`.
|
|
|
|
Although described in the thread-section, this predicate is also
|
|
available in the single-threaded version, where it behaves simply as
|
|
once/1.
|
|
|
|
|
|
*/
|
|
|
|
static Int
|
|
p_with_mutex( USES_REGS1 )
|
|
{
|
|
Term excep;
|
|
Int rc = FALSE;
|
|
Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
|
|
PredEntry *pe;
|
|
Term tm = CurrentModule;
|
|
Term tg = Deref(ARG2);
|
|
SWIMutex *mut = MutexOfTerm( ARG1 );
|
|
|
|
if (!mut || !LockMutex(mut PASS_REGS)) {
|
|
return FALSE;
|
|
}
|
|
|
|
tg = Yap_StripModule(tg, &tm);
|
|
if (IsVarTerm(tg)) {
|
|
Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2");
|
|
goto end;
|
|
} else if (IsApplTerm(tg)) {
|
|
register Functor f = FunctorOfTerm(tg);
|
|
register CELL *pt;
|
|
size_t i, arity;
|
|
|
|
f = FunctorOfTerm(tg);
|
|
if (IsExtensionFunctor(f)) {
|
|
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
|
|
goto end;
|
|
}
|
|
arity = ArityOfFunctor(f);
|
|
if (arity > MaxTemps) {
|
|
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
|
|
goto end;
|
|
}
|
|
pe = RepPredProp(PredPropByFunc(f, tm));
|
|
pt = RepAppl(tg)+1;
|
|
for (i= 0; i < arity; i++ )
|
|
XREGS[i+1] = pt[i];
|
|
} else if (IsAtomTerm(tg)) {
|
|
pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm));
|
|
} else if (IsPairTerm(tg)) {
|
|
register CELL *pt;
|
|
Functor f;
|
|
|
|
f = FunctorDot;
|
|
pe = RepPredProp(PredPropByFunc(f, tm));
|
|
pt = RepPair(tg);
|
|
XREGS[1] = pt[0];
|
|
XREGS[2] = pt[1];
|
|
} else {
|
|
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
|
|
goto end;
|
|
}
|
|
if (
|
|
pe->OpcodeOfPred != FAIL_OPCODE &&
|
|
Yap_execute_pred(pe, NULL PASS_REGS) ) {
|
|
rc = TRUE;
|
|
}
|
|
end:
|
|
excep = Yap_GetException();
|
|
if ( !UnLockMutex(mut PASS_REGS) ) {
|
|
return FALSE;
|
|
}
|
|
if (creeping) {
|
|
Yap_signal( YAP_CREEP_SIGNAL );
|
|
} else if ( excep != 0) {
|
|
return Yap_JumpToEnv(excep);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
|
|
static Int
|
|
p_with_with_mutex( USES_REGS1 )
|
|
{
|
|
if (GLOBAL_WithMutex == NULL) {
|
|
p_new_mutex( PASS_REGS1 );
|
|
GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
|
|
} else {
|
|
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
|
|
}
|
|
return p_lock_mutex( PASS_REGS1 );
|
|
}
|
|
|
|
static Int
|
|
p_unlock_with_mutex( USES_REGS1 )
|
|
{
|
|
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
|
|
return p_unlock_mutex( PASS_REGS1 );
|
|
}
|
|
|
|
static Int
|
|
p_mutex_info( USES_REGS1 )
|
|
{
|
|
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
|
|
if (!mut)
|
|
return FALSE;
|
|
|
|
return Yap_unify(ARG2, MkIntegerTerm(mut->owners)) &&
|
|
Yap_unify(ARG3, MkIntegerTerm(mut->tid_own));
|
|
return TRUE;
|
|
}
|
|
|
|
typedef struct {
|
|
UInt indx;
|
|
mbox_t mbox;
|
|
} counted_mbox;
|
|
|
|
static Int
|
|
p_mbox_create( USES_REGS1 )
|
|
{
|
|
Term namet = Deref(ARG1);
|
|
mbox_t* mboxp = GLOBAL_named_mboxes;
|
|
|
|
if (IsVarTerm(namet)) {
|
|
AtomEntry *ae;
|
|
int new;
|
|
mbox_t mbox;
|
|
|
|
ae = Yap_lookupBlob(&mbox, sizeof(mbox), &PL_Message_Queue, &new);
|
|
namet = MkAtomTerm(RepAtom(ae));
|
|
mboxp = (mbox_t *)(ae->rep.blob[0].data);
|
|
Yap_unify(ARG1, namet);
|
|
LOCK(GLOBAL_mboxq_lock);
|
|
} else if (IsAtomTerm(namet)) {
|
|
LOCK(GLOBAL_mboxq_lock);
|
|
while( mboxp && mboxp->name != namet)
|
|
mboxp = mboxp->next;
|
|
if (mboxp) {
|
|
UNLOCK(GLOBAL_mboxq_lock);
|
|
return FALSE;
|
|
}
|
|
mboxp = (mbox_t *)Yap_AllocCodeSpace(sizeof(mbox_t));
|
|
if (mboxp == NULL) {
|
|
UNLOCK(GLOBAL_mboxq_lock);
|
|
return FALSE;
|
|
}
|
|
// global mbox, for now we'll just insert in list
|
|
mboxp->next = GLOBAL_named_mboxes;
|
|
GLOBAL_named_mboxes = mboxp;
|
|
}
|
|
bool rc = mboxCreate( namet, mboxp PASS_REGS );
|
|
UNLOCK(GLOBAL_mboxq_lock);
|
|
return rc;
|
|
}
|
|
|
|
|
|
static Int
|
|
p_mbox_destroy( USES_REGS1 )
|
|
{
|
|
Term namet = Deref(ARG1);
|
|
mbox_t* mboxp = GLOBAL_named_mboxes, *prevp;
|
|
|
|
if (IsVarTerm(namet) )
|
|
return FALSE;
|
|
if (IsIntTerm(namet) ) {
|
|
return FALSE;
|
|
}
|
|
LOCK(GLOBAL_mboxq_lock);
|
|
prevp = NULL;
|
|
while( mboxp && mboxp->name != namet) {
|
|
prevp = mboxp;
|
|
mboxp = mboxp->next;
|
|
}
|
|
if (!mboxp) {
|
|
UNLOCK(GLOBAL_mboxq_lock);
|
|
return FALSE;
|
|
}
|
|
if (mboxp == GLOBAL_named_mboxes) {
|
|
GLOBAL_named_mboxes = mboxp->next;
|
|
} else {
|
|
prevp->next = mboxp->next;
|
|
}
|
|
UNLOCK(GLOBAL_mboxq_lock);
|
|
mboxDestroy(mboxp PASS_REGS);
|
|
Yap_FreeCodeSpace( (char *)mboxp );
|
|
return TRUE;
|
|
}
|
|
|
|
static mbox_t*
|
|
getMbox(Term t)
|
|
{
|
|
mbox_t* mboxp;
|
|
|
|
if (IsAtomTerm(t=Deref(t))) {
|
|
Atom at = AtomOfTerm(t);
|
|
LOCK(GLOBAL_mboxq_lock);
|
|
if (IsBlob(at)) {
|
|
mboxp = (mbox_t *)(RepAtom(at)->rep.blob[0].data);
|
|
} else {
|
|
mboxp = GLOBAL_named_mboxes;
|
|
while( mboxp && mboxp->name != t) {
|
|
mboxp = mboxp->next;
|
|
}
|
|
}
|
|
if (!mboxp->open)
|
|
mboxp = NULL;
|
|
if (mboxp) {
|
|
pthread_mutex_lock(& mboxp->mutex);
|
|
}
|
|
UNLOCK(GLOBAL_mboxq_lock);
|
|
} else if (IsIntTerm(t)) {
|
|
int wid = IntOfTerm(t);
|
|
if (REMOTE(wid) &&
|
|
(REMOTE_ThreadHandle(wid).in_use || REMOTE_ThreadHandle(wid).zombie))
|
|
{
|
|
return &REMOTE_ThreadHandle(wid).mbox_handle;
|
|
} else {
|
|
return NULL;
|
|
}
|
|
if (!mboxp->open)
|
|
mboxp = NULL;
|
|
if (mboxp) {
|
|
pthread_mutex_lock(& mboxp->mutex);
|
|
}
|
|
} else {
|
|
return NULL;
|
|
}
|
|
return mboxp;
|
|
}
|
|
|
|
|
|
static Int
|
|
p_mbox_send( USES_REGS1 )
|
|
{
|
|
Term namet = Deref(ARG1);
|
|
mbox_t* mboxp = getMbox(namet) ;
|
|
|
|
if (!mboxp)
|
|
return FALSE;
|
|
return mboxSend(mboxp, Deref(ARG2) PASS_REGS);
|
|
}
|
|
|
|
static Int
|
|
p_mbox_size( USES_REGS1 )
|
|
{
|
|
Term namet = Deref(ARG1);
|
|
mbox_t* mboxp = getMbox(namet) ;
|
|
|
|
if (!mboxp)
|
|
return FALSE;
|
|
return Yap_unify( ARG2, MkIntTerm(mboxp->nmsgs));
|
|
}
|
|
|
|
|
|
static Int
|
|
p_mbox_receive( USES_REGS1 )
|
|
{
|
|
Term namet = Deref(ARG1);
|
|
mbox_t* mboxp = getMbox(namet) ;
|
|
|
|
if (!mboxp)
|
|
return FALSE;
|
|
return mboxReceive(mboxp, Deref(ARG2) PASS_REGS);
|
|
}
|
|
|
|
|
|
static Int
|
|
p_mbox_peek( USES_REGS1 )
|
|
{
|
|
Term namet = Deref(ARG1);
|
|
mbox_t* mboxp = getMbox(namet) ;
|
|
|
|
if (!mboxp)
|
|
return FALSE;
|
|
return mboxPeek(mboxp, Deref(ARG2) PASS_REGS);
|
|
}
|
|
|
|
static Int
|
|
p_cond_create( USES_REGS1 )
|
|
{
|
|
pthread_cond_t* condp;
|
|
|
|
condp = (pthread_cond_t *)Yap_AllocCodeSpace(sizeof(pthread_cond_t));
|
|
if (condp == NULL) {
|
|
return FALSE;
|
|
}
|
|
pthread_cond_init(condp, NULL);
|
|
return Yap_unify(ARG1, MkIntegerTerm((Int)condp));
|
|
}
|
|
|
|
static Int
|
|
p_cond_destroy( USES_REGS1 )
|
|
{
|
|
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
|
|
|
|
if (pthread_cond_destroy(condp) < 0)
|
|
return FALSE;
|
|
Yap_FreeCodeSpace((void *)condp);
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_cond_signal( USES_REGS1 )
|
|
{
|
|
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
|
|
|
|
if (pthread_cond_signal(condp) < 0)
|
|
return FALSE;
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_cond_broadcast( USES_REGS1 )
|
|
{
|
|
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
|
|
|
|
if (pthread_cond_broadcast(condp) < 0)
|
|
return FALSE;
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_cond_wait( USES_REGS1 )
|
|
{
|
|
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
|
|
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2));
|
|
pthread_cond_wait(condp, &mut->m);
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_stacks( USES_REGS1 )
|
|
{ /* '$thread_signal'(+P) */
|
|
Int tid = IntegerOfTerm(Deref(ARG1));
|
|
Int status= TRUE;
|
|
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
if (REMOTE(tid) &&
|
|
(REMOTE_ThreadHandle(tid).in_use || REMOTE_ThreadHandle(tid).zombie)) {
|
|
status &= Yap_unify(ARG2,MkIntegerTerm(REMOTE_ThreadHandle(tid).ssize));
|
|
status &= Yap_unify(ARG3,MkIntegerTerm(REMOTE_ThreadHandle(tid).tsize));
|
|
status &= Yap_unify(ARG4,MkIntegerTerm(REMOTE_ThreadHandle(tid).sysize));
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
return status;
|
|
}
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
|
|
return FALSE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_atexit( USES_REGS1 )
|
|
{ /* '$thread_signal'(+P) */
|
|
Term t;
|
|
|
|
if (LOCAL_ThreadHandle.texit == NULL ||
|
|
LOCAL_ThreadHandle.texit->Entry == MkAtomTerm(AtomTrue)) {
|
|
return FALSE;
|
|
}
|
|
do {
|
|
t = Yap_PopTermFromDB(LOCAL_ThreadHandle.texit);
|
|
if (t == 0) {
|
|
if (LOCAL_Error_TYPE == OUT_OF_ATTVARS_ERROR) {
|
|
LOCAL_Error_TYPE = YAP_NO_ERROR;
|
|
if (!Yap_growglobal(NULL)) {
|
|
Yap_Error(OUT_OF_ATTVARS_ERROR, TermNil, LOCAL_ErrorMessage);
|
|
thread_die(worker_id, FALSE);
|
|
return FALSE;
|
|
}
|
|
} else {
|
|
LOCAL_Error_TYPE = YAP_NO_ERROR;
|
|
if (!Yap_growstack(LOCAL_ThreadHandle.tgoal->NOfCells*CellSize)) {
|
|
Yap_Error(OUT_OF_STACK_ERROR, TermNil, LOCAL_ErrorMessage);
|
|
thread_die(worker_id, FALSE);
|
|
return FALSE;
|
|
}
|
|
}
|
|
}
|
|
} while (t == 0);
|
|
LOCAL_ThreadHandle.texit = NULL;
|
|
return Yap_unify(ARG1, t) && Yap_unify(ARG2, LOCAL_ThreadHandle.texit_mod);
|
|
}
|
|
|
|
|
|
|
|
static Int
|
|
p_thread_signal( USES_REGS1 )
|
|
{ /* '$thread_signal'(+P) */
|
|
Int wid = IntegerOfTerm(Deref(ARG1));
|
|
/* make sure the lock is available */
|
|
MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
if (!REMOTE_ThreadHandle(wid).in_use ||
|
|
!REMOTE_ThreadHandle(wid).current_yaam_regs) {
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
return TRUE;
|
|
}
|
|
Yap_external_signal( wid, YAP_ITI_SIGNAL );
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_no_threads( USES_REGS1 )
|
|
{ /* '$thread_signal'(+P) */
|
|
return FALSE;
|
|
}
|
|
|
|
static Int
|
|
p_nof_threads( USES_REGS1 )
|
|
{ /* '$nof_threads'(+P) */
|
|
int i = 0, wid;
|
|
LOCK(GLOBAL_ThreadHandlesLock);
|
|
for (wid = 0; wid < MAX_THREADS; wid++) {
|
|
if (!Yap_local[wid]) break;
|
|
if (REMOTE_ThreadHandle(wid).in_use)
|
|
i++;
|
|
}
|
|
UNLOCK(GLOBAL_ThreadHandlesLock);
|
|
return Yap_unify(ARG1,MkIntegerTerm(i));
|
|
}
|
|
|
|
static Int
|
|
p_max_workers( USES_REGS1 )
|
|
{ /* '$max_workers'(+P) */
|
|
return Yap_unify(ARG1,MkIntegerTerm(MAX_WORKERS));
|
|
}
|
|
|
|
static Int
|
|
p_max_threads( USES_REGS1 )
|
|
{ /* '$max_threads'(+P) */
|
|
return Yap_unify(ARG1,MkIntegerTerm(MAX_THREADS));
|
|
}
|
|
|
|
static Int
|
|
p_nof_threads_created( USES_REGS1 )
|
|
{ /* '$nof_threads'(+P) */
|
|
return Yap_unify(ARG1,MkIntTerm(GLOBAL_NOfThreadsCreated));
|
|
}
|
|
|
|
static Int
|
|
p_thread_runtime( USES_REGS1 )
|
|
{ /* '$thread_runtime'(+P) */
|
|
return Yap_unify(ARG1,MkIntegerTerm(GLOBAL_ThreadsTotalTime));
|
|
}
|
|
|
|
static Int
|
|
p_thread_self_lock( USES_REGS1 )
|
|
{ /* '$thread_unlock' */
|
|
MUTEX_LOCK(&(LOCAL_ThreadHandle.tlock));
|
|
return Yap_unify(ARG1,MkIntegerTerm(worker_id));
|
|
}
|
|
|
|
static Int
|
|
p_thread_unlock( USES_REGS1 )
|
|
{ /* '$thread_unlock' */
|
|
Int wid = IntegerOfTerm(Deref(ARG1));
|
|
MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
|
|
return TRUE;
|
|
}
|
|
|
|
intptr_t
|
|
system_thread_id(PL_thread_info_t *info)
|
|
{ if ( !info )
|
|
{ CACHE_REGS
|
|
if ( LOCAL )
|
|
info = SWI_thread_info(worker_id, NULL);
|
|
else
|
|
return -1;
|
|
}
|
|
#ifdef __linux__
|
|
return info->pid;
|
|
#else
|
|
#ifdef __WINDOWS__
|
|
return info->w32id;
|
|
#else
|
|
return (intptr_t)info->tid;
|
|
#endif
|
|
#endif
|
|
}
|
|
|
|
void
|
|
Yap_InitFirstWorkerThreadHandle(void)
|
|
{
|
|
CACHE_REGS
|
|
set_system_thread_id(0, NULL);
|
|
LOCAL_ThreadHandle.id = 0;
|
|
LOCAL_ThreadHandle.in_use = TRUE;
|
|
LOCAL_ThreadHandle.default_yaam_regs =
|
|
&Yap_standard_regs;
|
|
LOCAL_ThreadHandle.current_yaam_regs =
|
|
&Yap_standard_regs;
|
|
LOCAL_PL_local_data_p->reg_cache =
|
|
&Yap_standard_regs;
|
|
LOCAL_ThreadHandle.pthread_handle = pthread_self();
|
|
pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock, NULL);
|
|
pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock_status, NULL);
|
|
LOCAL_ThreadHandle.tdetach = MkAtomTerm(AtomFalse);
|
|
LOCAL_ThreadHandle.ref_count = 1;
|
|
}
|
|
|
|
FILE *debugf;
|
|
|
|
void Yap_InitThreadPreds(void)
|
|
{
|
|
|
|
|
|
Yap_InitCPred("$no_threads", 0, p_no_threads, 0);
|
|
Yap_InitCPred("$max_workers", 1, p_max_workers, 0);
|
|
Yap_InitCPred("$max_threads", 1, p_max_threads, 0);
|
|
Yap_InitCPred("$thread_new_tid", 1, p_thread_new_tid, 0);
|
|
Yap_InitCPred("$create_thread", 7, p_create_thread, 0);
|
|
Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag);
|
|
Yap_InitCPred("$thread_status_lock", 1, p_thread_status_lock, SafePredFlag);
|
|
Yap_InitCPred("$thread_status_unlock", 1, p_thread_status_unlock, SafePredFlag);
|
|
Yap_InitCPred("$thread_zombie_self", 1, p_thread_zombie_self, SafePredFlag);
|
|
Yap_InitCPred("$thread_join", 1, p_thread_join, 0);
|
|
Yap_InitCPred("$thread_destroy", 1, p_thread_destroy, 0);
|
|
Yap_InitCPred("thread_yield", 0, p_thread_yield, 0);
|
|
/** @pred thread_yield
|
|
|
|
|
|
Voluntarily relinquish the processor.
|
|
|
|
|
|
*/
|
|
Yap_InitCPred("$detach_thread", 1, p_thread_detach, 0);
|
|
Yap_InitCPred("$thread_detached", 1, p_thread_detached, 0);
|
|
Yap_InitCPred("$thread_detached", 2, p_thread_detached2, 0);
|
|
Yap_InitCPred("$thread_exit", 0, p_thread_exit, 0);
|
|
Yap_InitCPred("thread_setconcurrency", 2, p_thread_set_concurrency, 0);
|
|
/** @pred thread_setconcurrency(+ _Old_, - _New_)
|
|
|
|
|
|
Determine the concurrency of the process, which is defined as the
|
|
maximum number of concurrently active threads. `Active` here means
|
|
they are using CPU time. This option is provided if the
|
|
thread-implementation provides
|
|
`pthread_setconcurrency()`. Solaris is a typical example of this
|
|
family. On other systems this predicate unifies _Old_ to 0 (zero)
|
|
and succeeds silently.
|
|
|
|
|
|
*/
|
|
Yap_InitCPred("$valid_thread", 1, p_valid_thread, 0);
|
|
Yap_InitCPred("mutex_create", 1, p_new_mutex, SafePredFlag);
|
|
Yap_InitCPred("mutex_destroy", 1, p_destroy_mutex, SafePredFlag);
|
|
Yap_InitCPred("mutex_lock", 1, p_lock_mutex, SafePredFlag);
|
|
Yap_InitCPred("mutex_trylock", 1, p_trylock_mutex, SafePredFlag);
|
|
Yap_InitCPred("mutex_unlock", 1, p_unlock_mutex, SafePredFlag);
|
|
Yap_InitCPred("with_mutex", 2, p_with_mutex, MetaPredFlag);
|
|
Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0);
|
|
Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0);
|
|
Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag);
|
|
Yap_InitCPred("$cond_create", 1, p_cond_create, SafePredFlag);
|
|
Yap_InitCPred("$cond_destroy", 1, p_cond_destroy, SafePredFlag);
|
|
Yap_InitCPred("$cond_signal", 1, p_cond_signal, SafePredFlag);
|
|
Yap_InitCPred("$cond_broadcast", 1, p_cond_broadcast, SafePredFlag);
|
|
Yap_InitCPred("$cond_wait", 2, p_cond_wait, SafePredFlag);
|
|
Yap_InitCPred("$message_queue_create", 1, p_mbox_create, SafePredFlag);
|
|
Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag);
|
|
Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag);
|
|
Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag);
|
|
Yap_InitCPred("$message_queue_size", 2, p_mbox_size, SafePredFlag);
|
|
Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag);
|
|
Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
|
|
Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag);
|
|
Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag);
|
|
Yap_InitCPred("$nof_threads_created", 1, p_nof_threads_created, SafePredFlag);
|
|
Yap_InitCPred("$thread_sleep", 4, p_thread_sleep, SafePredFlag);
|
|
Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag);
|
|
Yap_InitCPred("$thread_self_lock", 1, p_thread_self_lock, SafePredFlag);
|
|
Yap_InitCPred("$thread_run_at_exit", 2, p_thread_atexit, SafePredFlag);
|
|
Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag);
|
|
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
|
|
Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag);
|
|
Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag);
|
|
#endif
|
|
}
|
|
|
|
#else
|
|
|
|
int
|
|
Yap_NOfThreads(void) {
|
|
// GLOBAL_ThreadHandlesLock is held
|
|
#ifdef YAPOR
|
|
return 2;
|
|
#else
|
|
return 1;
|
|
#endif
|
|
}
|
|
|
|
|
|
static Int
|
|
p_no_threads(void)
|
|
{ /* '$thread_signal'(+P) */
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_nof_threads(void)
|
|
{ /* '$nof_threads'(+P) */
|
|
return Yap_unify(ARG1,MkIntTerm(1));
|
|
}
|
|
|
|
static Int
|
|
p_max_threads(void)
|
|
{ /* '$nof_threads'(+P) */
|
|
return Yap_unify(ARG1,MkIntTerm(1));
|
|
}
|
|
|
|
static Int
|
|
p_nof_threads_created(void)
|
|
{ /* '$nof_threads'(+P) */
|
|
return Yap_unify(ARG1,MkIntTerm(1));
|
|
}
|
|
|
|
static Int
|
|
p_thread_runtime(void)
|
|
{ /* '$thread_runtime'(+P) */
|
|
return Yap_unify(ARG1,MkIntTerm(0));
|
|
}
|
|
|
|
static Int
|
|
p_thread_self(void)
|
|
{ /* '$thread_runtime'(+P) */
|
|
return Yap_unify(ARG1,MkIntTerm(0));
|
|
}
|
|
|
|
static Int
|
|
p_thread_stacks(void)
|
|
{ /* '$thread_runtime'(+P) */
|
|
return FALSE;
|
|
}
|
|
|
|
static Int
|
|
p_thread_unlock(void)
|
|
{ /* '$thread_runtime'(+P) */
|
|
return TRUE;
|
|
}
|
|
|
|
static Int
|
|
p_max_workers(void)
|
|
{ /* '$max_workers'(+P) */
|
|
return Yap_unify(ARG1,MkIntTerm(1));
|
|
}
|
|
|
|
static Int
|
|
p_new_mutex(void)
|
|
{ /* '$max_workers'(+P) */
|
|
static int mutexes = 1;
|
|
return Yap_unify(ARG1, MkIntegerTerm(mutexes++) );
|
|
}
|
|
|
|
static Int
|
|
p_with_mutex( USES_REGS1 )
|
|
{
|
|
Int mut;
|
|
Term t1 = Deref(ARG1), excep;
|
|
Int rc = FALSE;
|
|
Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
|
|
PredEntry *pe;
|
|
Term tm = CurrentModule;
|
|
Term tg = Deref(ARG2);
|
|
|
|
if (IsVarTerm(t1)) {
|
|
p_new_mutex( PASS_REGS1 );
|
|
}
|
|
t1 = Deref(ARG1);
|
|
mut = IntOfTerm(t1);
|
|
tg = Yap_StripModule(tg, &tm);
|
|
if (IsVarTerm(tg)) {
|
|
Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2");
|
|
goto end;
|
|
} else if (IsApplTerm(tg)) {
|
|
register Functor f = FunctorOfTerm(tg);
|
|
register CELL *pt;
|
|
size_t i, arity;
|
|
|
|
f = FunctorOfTerm(tg);
|
|
if (IsExtensionFunctor(f)) {
|
|
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
|
|
goto end;
|
|
}
|
|
arity = ArityOfFunctor(f);
|
|
if (arity > MaxTemps) {
|
|
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
|
|
goto end;
|
|
}
|
|
pe = RepPredProp(PredPropByFunc(f, tm));
|
|
pt = RepAppl(tg)+1;
|
|
for (i= 0; i < arity; i++ )
|
|
XREGS[i+1] = pt[i];
|
|
} else if (IsAtomTerm(tg)) {
|
|
pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm));
|
|
} else if (IsPairTerm(tg)) {
|
|
register CELL *pt;
|
|
Functor f;
|
|
|
|
f = FunctorDot;
|
|
pe = RepPredProp(PredPropByFunc(f, tm));
|
|
pt = RepPair(tg);
|
|
XREGS[1] = pt[0];
|
|
XREGS[2] = pt[1];
|
|
} else {
|
|
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
|
|
goto end;
|
|
}
|
|
if (
|
|
pe->OpcodeOfPred != FAIL_OPCODE &&
|
|
Yap_execute_pred(pe, NULL PASS_REGS) ) {
|
|
rc = TRUE;
|
|
}
|
|
end:
|
|
ARG1 = MkIntegerTerm(mut);
|
|
excep = Yap_GetException();
|
|
if (creeping) {
|
|
Yap_signal( YAP_CREEP_SIGNAL );
|
|
} else if ( excep != 0) {
|
|
return Yap_JumpToEnv(excep);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
void
|
|
Yap_InitFirstWorkerThreadHandle(void)
|
|
{
|
|
}
|
|
|
|
void Yap_InitThreadPreds(void)
|
|
{
|
|
Yap_InitCPred("with_mutex", 2, p_with_mutex, MetaPredFlag);
|
|
Yap_InitCPred("mutex_create", 1, p_new_mutex, SafePredFlag);
|
|
Yap_InitCPred("$max_workers", 1, p_max_workers, 0);
|
|
Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag);
|
|
Yap_InitCPred("$no_threads", 0, p_no_threads, SafePredFlag);
|
|
Yap_InitCPred("$max_threads", 1, p_max_threads, SafePredFlag);
|
|
Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag);
|
|
Yap_InitCPred("$nof_threads_created", 1, p_nof_threads_created, SafePredFlag);
|
|
Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
|
|
Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag);
|
|
Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag);
|
|
#if DEBUG_LOCKS||DEBUG_PE_LOCKS
|
|
Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag);
|
|
Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag);
|
|
#endif
|
|
}
|
|
|
|
|
|
#endif /* THREADS */
|
|
|
|
|
|
/**
|
|
@}
|
|
*/
|