more thread work to cean-up mutexes. Not finshed yet

This commit is contained in:
Vitor Santos Costa 2014-11-25 19:52:51 +00:00
parent afc6c5d04e
commit 624183b78e
8 changed files with 83 additions and 453 deletions

View File

@ -944,13 +944,13 @@ static Int init_current_predicate(USES_REGS1) {
if (IsExtensionFunctor(f)) {
Yap_Error(TYPE_ERROR_CALLABLE, t3, "current_predicate/2");
cut_fail();
return FALSE;
}
at = NameOfFunctor(f);
arity = ArityOfFunctor(f);
}
if (IsAtomTerm(t2)) // we know the module and the main predicate
// so that we are deterministic
{
if (arity == 0) {
if (Yap_GetPredPropByAtom(at, t2) != NIL &&
Yap_unify(ARG1, MkAtomTerm(at)))

View File

@ -880,21 +880,7 @@ typedef struct swi_mutex {
pthread_mutex_t m;
} SWIMutex;
static SWIMutex *MutexOfTerm(Term t)
{
Term t1 = Deref(t);
SWIMutex *mut = NULL;
if (IsVarTerm(t1)) {
} else if (IsAtomTerm(t1)) {
} else {
mut = AddressOfTerm(t1);
}
return mut;
}
static Int
p_new_mutex( USES_REGS1 )
{
static SWIMutex *NewMutex(void) {
SWIMutex* mutp;
pthread_mutexattr_t mat;
#if defined(HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && !defined(__MINGW32__)
@ -916,19 +902,49 @@ p_new_mutex( USES_REGS1 )
pthread_mutex_init(&mutp->m, &mat);
mutp->owners = 0;
mutp->tid_own = 0;
if (IsVarTerm((t1 = Deref(ARG1)))) {
return Yap_unify(t1, MkAddressTerm(mutp));
} else if(IsAtomTerm(t1)) {
return Yap_PutAtomMutex( AtomOfTerm(t1), mutp );
}
return mutp;
}
#define MutexOfTerm(t) MutexOfTerm__(t PASS_REGS)
static SWIMutex *MutexOfTerm__(Term t USES_REGS){
Term t1 = Deref(t);
SWIMutex *mut = NULL;
if (IsVarTerm(t1)) {
mut = NewMutex();
Yap_unify(MkAddressTerm(mut), ARG1);
} else if (IsIntegerTerm(t1)) {
mut = AddressOfTerm(t1);
} else if (IsAtomTerm(t1)) {
mut = Yap_GetMutexFromProp(AtomOfTerm(t1));
}
return mut;
}
static Int
p_destroy_mutex( USES_REGS1 )
p_new_mutex( USES_REGS1 ){
SWIMutex* mutp;
Term t1;
if (IsVarTerm((t1 = Deref(ARG1)))) {
if (!(mutp = NewMutex()))
return FALSE;
return Yap_unify(ARG1, MkAddressTerm(mutp));
} else if(IsAtomTerm(t1)) {
mutp = NewMutex( );
return Yap_PutAtomMutex( AtomOfTerm(t1), mutp );
} else if (IsAddressTerm(t1)) {
return FALSE;
}
return FALSE;
}
static Int p_destroy_mutex( USES_REGS1 )
{
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut)
return FALSE;
if (pthread_mutex_destroy(&mut->m) < 0)
return FALSE;
Yap_FreeCodeSpace((void *)mut);
@ -938,8 +954,10 @@ p_destroy_mutex( USES_REGS1 )
static Int
p_lock_mutex( USES_REGS1 )
{
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut)
return FALSE;
#if DEBUG_LOCKS
MUTEX_LOCK(&mut->m);
#else
@ -954,8 +972,10 @@ p_lock_mutex( USES_REGS1 )
static Int
p_trylock_mutex( USES_REGS1 )
{
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut)
return FALSE;
if (MUTEX_TRYLOCK(&mut->m) == EBUSY)
return FALSE;
mut->owners++;
@ -966,8 +986,10 @@ p_trylock_mutex( USES_REGS1 )
static Int
p_unlock_mutex( USES_REGS1 )
{
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut)
return FALSE;
#if DEBUG_LOCKS
MUTEX_UNLOCK(&mut->m);
#else
@ -981,7 +1003,6 @@ p_unlock_mutex( USES_REGS1 )
static Int
p_with_mutex( USES_REGS1 )
{
SWIMutex *mut;
Term t1 = Deref(ARG1), excep;
Int rc = FALSE;
Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
@ -989,14 +1010,11 @@ p_with_mutex( USES_REGS1 )
Term tm = CurrentModule;
Term tg = Deref(ARG2);
if (IsVarTerm(t1)) {
p_new_mutex( PASS_REGS1 );
t1 = Deref(ARG1);
}
mut = (SWIMutex*)IntegerOfTerm(t1);
if (!p_lock_mutex( PASS_REGS1 )) {
return FALSE;
}
// this created the mutex
t1 = Deref(ARG1);
tg = Yap_StripModule(tg, &tm);
if (IsVarTerm(tg)) {
@ -1042,7 +1060,7 @@ p_with_mutex( USES_REGS1 )
rc = TRUE;
}
end:
ARG1 = MkIntegerTerm((Int)mut);
ARG1 = t1;
excep = Yap_GetException();
p_unlock_mutex( PASS_REGS1 );
if (creeping) {
@ -1073,251 +1091,23 @@ p_unlock_with_mutex( USES_REGS1 )
return p_unlock_mutex( PASS_REGS1 );
}
static Int
p_mutex_info( USES_REGS1 )
{
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = MutexOfTerm(Deref(ARG1));
if (!mut)
return FALSE;
return Yap_unify(ARG2, MkIntegerTerm(mut->owners)) &&
Yap_unify(ARG3, MkIntegerTerm(mut->tid_own));
return TRUE;
}
static Int
p_cond_create( USES_REGS1 )
{
pthread_cond_t* condp;
condp = (pthread_cond_t *)Yap_AllocCodeSpace(sizeof(pthread_cond_t));
if (condp == NULL) {
return FALSE;
}
pthread_cond_init(condp, NULL);
return Yap_unify(ARG1, MkIntegerTerm((Int)condp));
}
typedef struct {
UInt indx;
mbox_t mbox;
} counted_mbox;
static Int
p_mbox_create( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = GLOBAL_named_mboxes;
if (IsVarTerm(namet)) {
AtomEntry *ae;
int new;
mbox_t mbox;
ae = Yap_lookupBlob(&mbox, sizeof(mbox), &PL_Message_Queue, &new);
namet = MkAtomTerm(RepAtom(ae));
mboxp = (mbox_t *)(ae->rep.blob[0].data);
Yap_unify(ARG1, namet);
LOCK(GLOBAL_mboxq_lock);
} else if (IsAtomTerm(namet)) {
LOCK(GLOBAL_mboxq_lock);
while( mboxp && mboxp->name != namet)
mboxp = mboxp->next;
if (mboxp) {
UNLOCK(GLOBAL_mboxq_lock);
=======
return Yap_unify(ARG1, MkAddressTerm(mutp));
}
static Int
p_destroy_mutex( USES_REGS1 )
{
Term t1 = Deref(ARG1);
SWIMutex *mut;
if (IsVarTerm(t1)) {
} else if (IsAtomTerm(t1)) {
} else {
mut = AddressOfTerm(Deref(ARG1));
if (pthread_mutex_destroy(&mut->m) < 0)
return FALSE;
Yap_FreeCodeSpace((void *)mut);
}
return TRUE;
}
static Int
p_lock_mutex( USES_REGS1 )
{
SWIMutex *mut = MutexOfTerm( ARG1 );
#if DEBUG_LOCKS
MUTEX_LOCK(&mut->m);
#else
if (MUTEX_LOCK(&mut->m) < 0)
return FALSE;
#endif
mut->owners++;
mut->tid_own = worker_id;
return TRUE;
}
static Int
p_trylock_mutex( USES_REGS1 )
{
SWIMutex *mut = MutexOfTerm( ARG1 );
if (MUTEX_TRYLOCK(&mut->m) == EBUSY)
return FALSE;
mut->owners++;
mut->tid_own = worker_id;
return TRUE;
}
static Int
p_unlock_mutex( USES_REGS1 )
{
SWIMutex *mut = MutexOfTerm( ARG1 );
#if DEBUG_LOCKS
MUTEX_UNLOCK(&mut->m);
#else
if (MUTEX_UNLOCK(&mut->m) < 0)
return FALSE;
#endif
mut->owners--;
return TRUE;
}
static Int
p_with_mutex( USES_REGS1 )
{
SWIMutex *mut;
Term t1 = Deref(ARG1), excep;
Int rc = FALSE;
Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
PredEntry *pe;
Term tm = CurrentModule;
Term tg = Deref(ARG2);
if (IsVarTerm(t1)) {
p_new_mutex( PASS_REGS1 );
t1 = Deref(ARG1);
}
if (IsAtomTerm(t1)) {
} else {
mut = AddressOfTerm(Deref(ARG1));
if (FALSE && !p_lock_mutex( PASS_REGS1 )) {
return FALSE;
}
}
tg = Yap_StripModule(tg, &tm);
if (IsVarTerm(tg)) {
Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2");
goto end;
} else if (IsApplTerm(tg)) {
register Functor f = FunctorOfTerm(tg);
register CELL *pt;
size_t i, arity;
f = FunctorOfTerm(tg);
if (IsExtensionFunctor(f)) {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
arity = ArityOfFunctor(f);
if (arity > MaxTemps) {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
pe = RepPredProp(PredPropByFunc(f, tm));
pt = RepAppl(tg)+1;
for (i= 0; i < arity; i++ )
XREGS[i+1] = pt[i];
} else if (IsAtomTerm(tg)) {
pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm));
} else if (IsPairTerm(tg)) {
register CELL *pt;
Functor f;
f = FunctorDot;
pe = RepPredProp(PredPropByFunc(f, tm));
pt = RepPair(tg);
XREGS[1] = pt[0];
XREGS[2] = pt[1];
} else {
Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
goto end;
}
if (
pe->OpcodeOfPred != FAIL_OPCODE &&
Yap_execute_pred(pe, NULL PASS_REGS) ) {
rc = TRUE;
}
end:
ARG1 = MkIntegerTerm((Int)mut);
excep = Yap_GetException();
if (FALSE) p_unlock_mutex( PASS_REGS1 );
if (creeping) {
Yap_signal( YAP_CREEP_SIGNAL );
} else if ( excep != 0) {
return Yap_JumpToEnv(excep);
}
return rc;
}
static Int
p_with_with_mutex( USES_REGS1 )
{
if (GLOBAL_WithMutex == NULL) {
p_new_mutex( PASS_REGS1 );
GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
} else {
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
}
return p_lock_mutex( PASS_REGS1 );
}
static Int
p_unlock_with_mutex( USES_REGS1 )
{
ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
return p_unlock_mutex( PASS_REGS1 );
}
static Int
p_mutex_info( USES_REGS1 )
{
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
return Yap_unify(ARG2, MkIntegerTerm(mut->owners)) &&
Yap_unify(ARG3, MkIntegerTerm(mut->tid_own));
return TRUE;
}
static Int
p_cond_create( USES_REGS1 )
{
pthread_cond_t* condp;
condp = (pthread_cond_t *)Yap_AllocCodeSpace(sizeof(pthread_cond_t));
if (condp == NULL) {
return FALSE;
}
pthread_cond_init(condp, NULL);
return Yap_unify(ARG1, MkIntegerTerm((Int)condp));
}
typedef struct {
UInt indx;
mbox_t mbox;
} counted_mbox;
static Int
p_mbox_create( USES_REGS1 )
{
@ -1356,28 +1146,6 @@ p_mbox_create( USES_REGS1 )
return rc;
}
static Int
p_mbox_destroy( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = GLOBAL_named_mboxes, *prevp;
if (IsVarTerm(namet) )
return FALSE;
}
mboxp = (mbox_t *)Yap_AllocCodeSpace(sizeof(mbox_t));
if (mboxp == NULL) {
UNLOCK(GLOBAL_mboxq_lock);
return FALSE;
}
// global mbox, for now we'll just insert in list
mboxp->next = GLOBAL_named_mboxes;
GLOBAL_named_mboxes = mboxp;
}
bool rc = mboxCreate( namet, mboxp PASS_REGS );
UNLOCK(GLOBAL_mboxq_lock);
return rc;
}
static Int
p_mbox_destroy( USES_REGS1 )
@ -1408,26 +1176,6 @@ p_mbox_destroy( USES_REGS1 )
UNLOCK(GLOBAL_mboxq_lock);
mboxDestroy(mboxp PASS_REGS);
Yap_FreeCodeSpace( (char *)mboxp );
=======
}
LOCK(GLOBAL_mboxq_lock);
prevp = NULL;
while( mboxp && mboxp->name != namet) {
prevp = mboxp;
mboxp = mboxp->next;
}
if (!mboxp) {
UNLOCK(GLOBAL_mboxq_lock);
return FALSE;
}
if (mboxp == GLOBAL_named_mboxes) {
GLOBAL_named_mboxes = mboxp->next;
} else {
prevp->next = mboxp->next;
}
UNLOCK(GLOBAL_mboxq_lock);
mboxDestroy(mboxp PASS_REGS);
Yap_FreeCodeSpace( (char *)mboxp );
return TRUE;
}
@ -1520,7 +1268,20 @@ p_mbox_destroy( USES_REGS1 )
return mboxPeek(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
static Int
p_cond_create( USES_REGS1 )
{
pthread_cond_t* condp;
condp = (pthread_cond_t *)Yap_AllocCodeSpace(sizeof(pthread_cond_t));
if (condp == NULL) {
return FALSE;
}
pthread_cond_init(condp, NULL);
return Yap_unify(ARG1, MkIntegerTerm((Int)condp));
}
static Int
p_cond_destroy( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
@ -1560,136 +1321,6 @@ p_mbox_destroy( USES_REGS1 )
return TRUE;
}
static mbox_t*
getMbox(Term t)
{
mbox_t* mboxp;
if (IsAtomTerm(t=Deref(t))) {
Atom at = AtomOfTerm(t);
LOCK(GLOBAL_mboxq_lock);
if (IsBlob(at)) {
mboxp = (mbox_t *)(RepAtom(at)->rep.blob[0].data);
} else {
mboxp = GLOBAL_named_mboxes;
while( mboxp && mboxp->name != t) {
mboxp = mboxp->next;
}
}
if (!mboxp->open)
mboxp = NULL;
if (mboxp) {
pthread_mutex_lock(& mboxp->mutex);
}
UNLOCK(GLOBAL_mboxq_lock);
} else if (IsIntTerm(t)) {
int wid = IntOfTerm(t);
if (REMOTE(wid) &&
(REMOTE_ThreadHandle(wid).in_use || REMOTE_ThreadHandle(wid).zombie))
{
return &REMOTE_ThreadHandle(wid).mbox_handle;
} else {
return NULL;
}
if (!mboxp->open)
mboxp = NULL;
if (mboxp) {
pthread_mutex_lock(& mboxp->mutex);
}
} else {
return NULL;
}
return mboxp;
}
static Int
p_mbox_send( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
if (!mboxp)
return FALSE;
return mboxSend(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_mbox_size( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
if (!mboxp)
return FALSE;
return Yap_unify( ARG2, MkIntTerm(mboxp->nmsgs));
}
static Int
p_mbox_receive( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
if (!mboxp)
return FALSE;
return mboxReceive(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_mbox_peek( USES_REGS1 )
{
Term namet = Deref(ARG1);
mbox_t* mboxp = getMbox(namet) ;
if (!mboxp)
return FALSE;
return mboxPeek(mboxp, Deref(ARG2) PASS_REGS);
}
static Int
p_cond_destroy( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_destroy(condp) < 0)
return FALSE;
Yap_FreeCodeSpace((void *)condp);
return TRUE;
}
static Int
p_cond_signal( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_signal(condp) < 0)
return FALSE;
return TRUE;
}
static Int
p_cond_broadcast( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
if (pthread_cond_broadcast(condp) < 0)
return FALSE;
else
return TRUE;
}
static Int
p_cond_wait( USES_REGS1 )
{
pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2));
pthread_cond_wait(condp, &mut->m);
return TRUE;
}
static Int
p_thread_stacks( USES_REGS1 )
{ /* '$thread_signal'(+P) */

View File

@ -1333,8 +1333,8 @@ IsTranslationProperty (int flags)
bool Yap_PutAtomMutex(Atom a, void *ptr);
/* get mutex prop for atom; */
static inline MutexEntry *
Yap_GetMutexProp(Atom at)
static inline void *
Yap_GetMutexFromProp(Atom at)
{
Prop p0;
AtomEntry *ae = RepAtom(at);

View File

@ -64,7 +64,7 @@ static inline int si_callback(void *key, void *data, void *arg)
/* Judy1 integer sparse set intersection */
static inline int j1_callback(void *key, void *data, void *arg)
{
int r;
intptr_t r;
Pvoid_t *array = (Pvoid_t *) arg;
J1S(r, *array, (int) data);
if (r == JERR)

View File

@ -45,7 +45,7 @@ INFODIR=$(SHAREDIR)/info
CC=@CC@
CPP=@CPP@
DEFS=@DEFS@
CPPFLAGS=@CPPFLAGS@ -I../.. -I$(srcdir)/../../H
CPPFLAGS=@CPPFLAGS@ -I../.. -I$(srcdir)/../../H -I$(srcdir)/../../include
CFLAGS= @SHLIB_CFLAGS@ $(DEFS) $(CPPFLAGS)
LIBS=@LIBS@
LDFLAGS=@LDFLAGS@
@ -106,7 +106,7 @@ mpe.o: $(srcdir)/mpe.c
depend: $(HEADERS) $(C_SOURCES)
-@if test "$(GCC)" = yes; then\
$(CC) -MM $(CFLAGS) -I$(srcdir)/include $(C_SOURCES) > .depend;\
$(CC) -MM $(CFLAGS) -I$(srcdir)/include -i ../.. $(C_SOURCES) > .depend;\
else\
makedepend -f - -- $(CFLAGS) -I$(srcdir)/include -- $(C_SOURCES) |\
sed 's|.*/\([^:]*\):|\1:|' > .depend ;\

View File

@ -2384,6 +2384,7 @@ argument to wait()
#endif /*HAVE_SYS_WAIT_H*/
typedef sighandler_t sigf_t;
int
System(char *cmd)
@ -2391,9 +2392,8 @@ System(char *cmd)
int pid;
char *shell = "/bin/sh";
int rval;
void (*old_int)();
void (*old_stop)();
sigf_t old_int, old_stop;
if ( (pid = fork()) == -1 )
{ return PL_error("shell", 2, OsError(), ERR_SYSCALL, "fork");
} else if ( pid == 0 ) /* The child */
@ -2407,9 +2407,9 @@ System(char *cmd)
{ wait_t status; /* the parent */
int n;
old_int = signal(SIGINT, SIG_IGN);
old_int = (sigf_t)signal(SIGINT, SIG_IGN);
#ifdef SIGTSTP
old_stop = signal(SIGTSTP, SIG_DFL);
old_stop = (sigf_t)signal(SIGTSTP, SIG_DFL);
#endif /* SIGTSTP */
for(;;)

View File

@ -140,7 +140,7 @@ static int S__seterror(IOSTREAM *s);
#ifdef O_PLMT
#define SLOCK(s) if ( s->mutex ) recursiveMutexLock(s->mutex)
#define SUNLOCK(s) if ( s->mutex ) recursiveMutexUnlock(s->mutex)
inline int
static inline int
STRYLOCK(IOSTREAM *s)
{ if ( s->mutex &&
recursiveMutexTryLock(s->mutex) == EBUSY )

View File

@ -1562,4 +1562,3 @@ thread_local(X) :-
/**
@}
*/
>