From 186adc20530d58dc2dd54ad4b3bd016db9c9985a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADtor=20Santos=20Costa?= Date: Tue, 14 Oct 2014 15:53:24 +0100 Subject: [PATCH] More thread fixes, including true anonymous mqueues, worker_id for sequential, support for arithmetic exceptions on mac, fixes for with_mutex, fixes for dangling pointers in thread termination. Uuufff... --- C/eval.c | 41 ++++++++++++++- C/pl-yap.c | 11 ++-- C/threads.c | 93 +++++++++++++++++++++++++++++---- H/Regs.h | 1 + H/Yapproto.h | 1 + H/dglobals.h | 2 + H/eval.h | 11 ++-- H/hglobals.h | 2 + H/iglobals.h | 2 + H/rglobals.h | 2 + configure.in | 4 +- library/dialect/swi/fli/blobs.c | 9 ++-- misc/GLOBALS | 2 + packages/prosqlite/Makefile | 85 ------------------------------ pl/boot.yap | 1 - pl/errors.yap | 2 +- pl/flags.yap | 2 +- pl/threads.yap | 79 ++++++++++------------------ 18 files changed, 186 insertions(+), 164 deletions(-) delete mode 100644 packages/prosqlite/Makefile diff --git a/C/eval.c b/C/eval.c index 83c02ac41..48f750a35 100644 --- a/C/eval.c +++ b/C/eval.c @@ -39,6 +39,9 @@ static char SccsId[] = "%W% %G%"; #if HAVE_UNISTD_H #include #endif +#if HAVE_FENV_H +#include +#endif static Term Eval(Term t1 USES_REGS); @@ -92,6 +95,7 @@ get_matrix_element(Term t1, Term t2 USES_REGS) static Term Eval(Term t USES_REGS) { + if (IsVarTerm(t)) { LOCAL_ArithError = TRUE; return Yap_ArithError(INSTANTIATION_ERROR,t,"in arithmetic"); @@ -167,12 +171,45 @@ Eval(Term t USES_REGS) } } + +#if HAVE_FENV_H Term -Yap_InnerEval(Term t) +Yap_InnerEval__(Term t USES_REGS) +{ +#pragma STDC FENV_ACCESS ON + int raised; + Term ret; + + feclearexcept(FE_ALL_EXCEPT); + ret = Eval(t PASS_REGS); + if ( ret && (raised = fetestexcept( FE_DIVBYZERO | FE_OVERFLOW | FE_UNDERFLOW)) ) { + + feclearexcept(FE_ALL_EXCEPT); + if (raised & FE_OVERFLOW) { + LOCAL_Error_TYPE = EVALUATION_ERROR_FLOAT_OVERFLOW; + } else if (raised & (FE_INVALID|FE_INEXACT)) { + LOCAL_Error_TYPE = EVALUATION_ERROR_UNDEFINED; + } else if (raised & FE_DIVBYZERO) { + LOCAL_Error_TYPE = EVALUATION_ERROR_ZERO_DIVISOR; + } else if (raised & FE_UNDERFLOW) { + LOCAL_Error_TYPE = EVALUATION_ERROR_FLOAT_UNDERFLOW; + } else { + LOCAL_Error_TYPE = EVALUATION_ERROR_UNDEFINED; + } + LOCAL_Error_Term = t; + LOCAL_ErrorMessage="Arithmetic Exception"; + return 0L; + } + return ret; +} +#else +Term +Yap_InnerEval__(Term t USES_REGS) { CACHE_REGS return Eval(t PASS_REGS); } +#endif #ifdef BEAM Int BEAM_is(void); @@ -212,7 +249,7 @@ p_is( USES_REGS1 ) { /* X is Y */ Term out = 0L; - while (!(out = Eval(Deref(ARG2) PASS_REGS))) { + while (!(out = Yap_InnerEval(Deref(ARG2)))) { if (LOCAL_Error_TYPE == RESOURCE_ERROR_STACK) { LOCAL_Error_TYPE = YAP_NO_ERROR; if (!Yap_gcl(LOCAL_Error_Size, 2, ENV, CP)) { diff --git a/C/pl-yap.c b/C/pl-yap.c index ddbe5626f..b888170c1 100755 --- a/C/pl-yap.c +++ b/C/pl-yap.c @@ -152,14 +152,14 @@ callProlog(module_t module, term_t goal, int flags, term_t *ex ) } } -extern YAP_Term Yap_InnerEval(YAP_Term t); +extern YAP_Term Yap_InnerEval__(YAP_Term t USES_REGS); inline static YAP_Term -Yap_Eval(YAP_Term t) +Yap_Eval(YAP_Term t USES_REGS) { - if (t == 0L || ( !YAP_IsVarTerm(t) && (YAP_IsIntTerm(t) || YAP_IsFloatTerm(t)) )) + if (t == 0L || ( !YAP_IsVarTerm(t) && (YAP_IsIntTerm(t) || YAP_IsFloatTerm(t)) ) ) return t; - return Yap_InnerEval(t); + return Yap_InnerEval__(t PASS_REGS); } IOENC @@ -196,7 +196,8 @@ PL_qualify(term_t raw, term_t qualified) int valueExpression(term_t t, Number r ARG_LD) { - YAP_Term t0 = Yap_Eval(YAP_GetFromSlot(t)); + REGS_FROM_LD + YAP_Term t0 = Yap_Eval(Yap_GetFromSlot(t PASS_REGS) PASS_REGS); if (YAP_IsIntTerm(t0)) { r->type = V_INTEGER; r->value.i = YAP_IntOfTerm(t0); diff --git a/C/threads.c b/C/threads.c index 987ec6d6e..93f5167ae 100644 --- a/C/threads.c +++ b/C/threads.c @@ -30,6 +30,7 @@ static char SccsId[] = "%W% %G%"; #include "yapio.h" #include "pl-shared.h" #include +#include #if HAVE_STRING_H #include #endif @@ -37,6 +38,18 @@ static char SccsId[] = "%W% %G%"; #include "tab.macros.h" #endif /* TABLING */ + +PL_blob_t PL_Message_Queue = { + PL_BLOB_MAGIC, + PL_BLOB_UNIQUE | PL_BLOB_NOCOPY, + "message_queue", + 0, // release + 0, // compare + 0, // write + 0 // acquire +}; + + #if DEBUG_LOCKS int debug_locks = TRUE; @@ -131,6 +144,7 @@ allocate_new_tid(void) return new_worker_id; } + static bool mboxCreate( Term namet, mbox_t *mboxp USES_REGS ) { @@ -884,6 +898,7 @@ p_new_mutex( USES_REGS1 ) return Yap_unify(ARG1, MkIntegerTerm((Int)mutp)); } + static Int p_destroy_mutex( USES_REGS1 ) { @@ -938,6 +953,30 @@ p_unlock_mutex( USES_REGS1 ) return TRUE; } +static Int +p_with_with_mutex( USES_REGS1 ) +{ + if (GLOBAL_WithMutex == NULL) { + Term t = ARG1; + LOCK(GLOBAL_ThreadHandlesLock); + p_new_mutex( PASS_REGS1 ); + GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1)); + UNLOCK(GLOBAL_ThreadHandlesLock); + } else { + ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex); + } + return p_lock_mutex( PASS_REGS1 ); +} + +static Int +p_unlock_with_mutex( USES_REGS1 ) +{ + ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex); + return p_unlock_mutex( PASS_REGS1 ); +} + + + static Int p_mutex_info( USES_REGS1 ) { @@ -961,6 +1000,10 @@ p_cond_create( USES_REGS1 ) return Yap_unify(ARG1, MkIntegerTerm((Int)condp)); } +typedef struct { + UInt indx; + mbox_t mbox; +} counted_mbox; static Int p_mbox_create( USES_REGS1 ) @@ -969,12 +1012,15 @@ p_mbox_create( USES_REGS1 ) mbox_t* mboxp = GLOBAL_named_mboxes; if (IsVarTerm(namet)) { - char buf[256]; - sprintf(buf, "$%p", mboxp); - namet = MkAtomTerm(Yap_FullLookupAtom(buf)); - Yap_unify(ARG1, namet); - } - if (IsAtomTerm(namet)) { + AtomEntry *ae; + counted_mbox c; + int new; + c.indx = GLOBAL_mbox_count++; + ae = Yap_lookupBlob(&c, sizeof(c), &PL_Message_Queue, &new); + namet = MkAtomTerm(RepAtom(ae)); + mboxp = &(((counted_mbox *)(ae->rep.blob[0].data))->mbox); + Yap_unify(ARG1, namet); + } else if (IsAtomTerm(namet)) { LOCK(GLOBAL_mboxq_lock); while( mboxp && mboxp->name != namet) mboxp = mboxp->next; @@ -1000,8 +1046,11 @@ p_mbox_destroy( USES_REGS1 ) Term namet = Deref(ARG1); mbox_t* mboxp = GLOBAL_named_mboxes, *prevp; - if (IsVarTerm(namet) || !IsAtomTerm(namet)) - return FALSE; + if (IsVarTerm(namet) ) + return FALSE; + if (IsIntTerm(namet) ) { + return FALSE; + } LOCK(GLOBAL_mboxq_lock); prevp = NULL; while( mboxp && mboxp->name != namet) { @@ -1027,12 +1076,18 @@ static mbox_t* getMbox(Term t) { mbox_t* mboxp; - if (IsAtomTerm(t)) { + if (IsAtomTerm(t=Deref(t))) { + Atom at = AtomOfTerm(t); + if (IsBlob(at)) { + mboxp = &(((counted_mbox *)(RepAtom(at)->rep.blob[0].data))->mbox); + LOCK(GLOBAL_mboxq_lock); + } else { LOCK(GLOBAL_mboxq_lock); mboxp = GLOBAL_named_mboxes; while( mboxp && mboxp->name != t) { mboxp = mboxp->next; } + } } else if (IsIntTerm(t)) { int wid = IntOfTerm(t); if (REMOTE(wid) && @@ -1059,9 +1114,22 @@ p_mbox_send( USES_REGS1 ) Term namet = Deref(ARG1); mbox_t* mboxp = getMbox(namet) ; + if (!mboxp) + return FALSE; return mboxSend(mboxp, Deref(ARG2) PASS_REGS); } +static Int +p_mbox_size( USES_REGS1 ) +{ + Term namet = Deref(ARG1); + mbox_t* mboxp = getMbox(namet) ; + + if (!mboxp) + return FALSE; + return Yap_unify( ARG2, MkIntTerm(mboxp->nclients)); +} + static Int p_mbox_receive( USES_REGS1 ) @@ -1069,6 +1137,8 @@ p_mbox_receive( USES_REGS1 ) Term namet = Deref(ARG1); mbox_t* mboxp = getMbox(namet) ; + if (!mboxp) + return FALSE; return mboxReceive(mboxp, Deref(ARG2) PASS_REGS); } @@ -1079,6 +1149,8 @@ p_mbox_peek( USES_REGS1 ) Term namet = Deref(ARG1); mbox_t* mboxp = getMbox(namet) ; + if (!mboxp) + return FALSE; return mboxPeek(mboxp, Deref(ARG2) PASS_REGS); } @@ -1339,6 +1411,8 @@ and succeeds silently. Yap_InitCPred("$lock_mutex", 1, p_lock_mutex, SafePredFlag); Yap_InitCPred("$trylock_mutex", 1, p_trylock_mutex, SafePredFlag); Yap_InitCPred("$unlock_mutex", 1, p_unlock_mutex, SafePredFlag); + Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0); + Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0); Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag); Yap_InitCPred("$cond_create", 1, p_cond_create, SafePredFlag); Yap_InitCPred("$cond_destroy", 1, p_cond_destroy, SafePredFlag); @@ -1349,6 +1423,7 @@ and succeeds silently. Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag); Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag); Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag); + Yap_InitCPred("$message_queue_size", 2, p_mbox_size, SafePredFlag); Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag); Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag); Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag); diff --git a/H/Regs.h b/H/Regs.h index 2f1ff1f61..056b7b4f5 100755 --- a/H/Regs.h +++ b/H/Regs.h @@ -658,6 +658,7 @@ INLINE_ONLY EXTERN inline void restore_B(void) { #define frame_tail Yap_REGS.frame_tail_ #endif /* YAPOR_SBA */ #else +#define worker_id 0 #define LOCAL (&Yap_local) #endif /* YAPOR || THREADS */ #define CurrentModule Yap_REGS.CurrentModule_ diff --git a/H/Yapproto.h b/H/Yapproto.h index 8ec1f9264..1d245e506 100755 --- a/H/Yapproto.h +++ b/H/Yapproto.h @@ -449,6 +449,7 @@ void Yap_swi_install(void); void Yap_InitSWIHash(void); int Yap_get_stream_handle(Term, int, int, void *); Term Yap_get_stream_position(void *); +AtomEntry *Yap_lookupBlob(void *blob, size_t len, void *type, int *newp); /* opt.preds.c */ void Yap_init_optyap_preds(void); diff --git a/H/dglobals.h b/H/dglobals.h index 2abfde968..4b0f9fb27 100644 --- a/H/dglobals.h +++ b/H/dglobals.h @@ -63,6 +63,8 @@ #define GLOBAL_master_thread Yap_global->master_thread_ #define GLOBAL_named_mboxes Yap_global->named_mboxes_ #define GLOBAL_mboxq_lock Yap_global->mboxq_lock_ +#define GLOBAL_mbox_count Yap_global->mbox_count_ +#define GLOBAL_WithMutex Yap_global->WithMutex_ #endif /* THREADS */ #define GLOBAL_stdout Yap_global->stdout_ diff --git a/H/eval.h b/H/eval.h index f23049dca..0b8b05a0c 100644 --- a/H/eval.h +++ b/H/eval.h @@ -358,15 +358,18 @@ Term Yap_eval_atom(Int); Term Yap_eval_unary(Int,Term); Term Yap_eval_binary(Int,Term,Term); -Term Yap_InnerEval(Term); +Term Yap_InnerEval__(Term USES_REGS); Int Yap_ArithError(yap_error_number,Term,char *msg, ...); #include "inline-only.h" -INLINE_ONLY inline EXTERN Term -Yap_Eval(Term t); + +#define Yap_InnerEval(x) Yap_InnerEval__(x PASS_REGS) +#define Yap_Eval(x) Yap_Eval__(x PASS_REGS) + +INLINE_ONLY inline EXTERN Term Yap_Eval__(Term t USES_REGS); INLINE_ONLY inline EXTERN Term -Yap_Eval(Term t) +Yap_Eval__(Term t USES_REGS) { if (t == 0L || ( !IsVarTerm(t) && IsNumTerm(t) )) return t; diff --git a/H/hglobals.h b/H/hglobals.h index eea32423f..b4488d49f 100644 --- a/H/hglobals.h +++ b/H/hglobals.h @@ -63,6 +63,8 @@ typedef struct global_data { pthread_t master_thread_; struct thread_mbox* named_mboxes_; lockvar mboxq_lock_; + UInt mbox_count_; + struct swi_mutex* WithMutex_; #endif /* THREADS */ struct io_stream* stdout_; diff --git a/H/iglobals.h b/H/iglobals.h index 67e8d3d0f..c2b3192c1 100644 --- a/H/iglobals.h +++ b/H/iglobals.h @@ -63,6 +63,8 @@ static void InitGlobal(void) { GLOBAL_named_mboxes = NULL; INIT_LOCK(GLOBAL_mboxq_lock); + GLOBAL_mbox_count = 0; + #endif /* THREADS */ GLOBAL_stdout = Soutput; diff --git a/H/rglobals.h b/H/rglobals.h index dce458f87..3f7d2cdd4 100644 --- a/H/rglobals.h +++ b/H/rglobals.h @@ -63,6 +63,8 @@ static void RestoreGlobal(void) { REINIT_LOCK(GLOBAL_mboxq_lock); + + #endif /* THREADS */ diff --git a/configure.in b/configure.in index cfd404258..a4178c8cb 100755 --- a/configure.in +++ b/configure.in @@ -1551,7 +1551,7 @@ AC_TRY_COMPILE( #include #include , - printf("SIGINFO value is %d\n", SA_SIGINFO); + printf("SIGINFO value is %d\n", SIGINFO); , yap_cv_siginfo=yes,yap_cv_siginfo=no)]) AC_MSG_RESULT($yap_cv_siginfo) @@ -1567,7 +1567,7 @@ AC_TRY_COMPILE( #include #include , - printf("SIGFPE value is %d\n", SA_SIGFPE); + printf("SIGFPE value is %d\n", SIGFPE); , yap_cv_sigfpe=yes,yap_cv_sigfpe=no)]) AC_MSG_RESULT($yap_cv_sigfpe) diff --git a/library/dialect/swi/fli/blobs.c b/library/dialect/swi/fli/blobs.c index 537c840cc..1d64d06c3 100644 --- a/library/dialect/swi/fli/blobs.c +++ b/library/dialect/swi/fli/blobs.c @@ -76,11 +76,12 @@ PL_is_blob(term_t t, PL_blob_t **type) /* } */ /* } */ -static AtomEntry * -lookupBlob(void *blob, size_t len, PL_blob_t *type, int *new) +AtomEntry * +Yap_lookupBlob(void *blob, size_t len, void *type0, int *new) { BlobPropEntry *b; AtomEntry *ae; + PL_blob_t *type = type0; if (new) *new = FALSE; @@ -136,7 +137,7 @@ PL_unify_blob(term_t t, void *blob, size_t len, PL_blob_t *type) if (!blob) return FALSE; - ae = lookupBlob(blob, len, type, NULL); + ae = Yap_lookupBlob(blob, len, type, NULL); if (!ae) { return FALSE; } @@ -155,7 +156,7 @@ PL_put_blob(term_t t, void *blob, size_t len, PL_blob_t *type) if (!blob) return FALSE; - ae = lookupBlob(blob, len, type, & ret); + ae = Yap_lookupBlob(blob, len, type, & ret); if (!ae) { return FALSE; } diff --git a/misc/GLOBALS b/misc/GLOBALS index 8c503d1f4..3401821f4 100755 --- a/misc/GLOBALS +++ b/misc/GLOBALS @@ -71,6 +71,8 @@ int PrologShouldHandleInterrupts void pthread_t master_thread void struct thread_mbox* named_mboxes =NULL lockvar mboxq_lock MkLock +UInt mbox_count =0 +struct swi_mutex* WithMutex void #endif /* THREADS */ // streams diff --git a/packages/prosqlite/Makefile b/packages/prosqlite/Makefile deleted file mode 100644 index f0c904843..000000000 --- a/packages/prosqlite/Makefile +++ /dev/null @@ -1,85 +0,0 @@ -################################################################ -# Makefile template for SWI-Prolog PROSQLITE interface -# -# This template is used by configure to create Makefile. See -# the file INSTALL for further installation instructions. -# -# License: LGPL -# -# Author: Nicos Angelopoulos & Jan Wielemaker (jan@swi.psy.uva.nl) -################################################################ - -PACKAGE=prosqlite -DOC=prosqlite -include ../Makefile.defs - -CFLAGS+= -I/usr/include -I. -LDSOFLAGS+= -L/usr/lib - -LIBS=-lgmp -lreadline -lncurses -lpthread -lresolv -lnss_dns -lnss_files -lcrypt -lstdc++ -lm -L/u/vitor/lib -ldl -lnsl -NETLIBS=@NETLIBS@ - -LIBPL= prolog/prosqlite.pl -TARGETS= prosqlite.so - -PROSQLITEOBJ= prosqlite.o - -all: $(TARGETS) - -nolib:: - @echo "WARNING: Could not find sqlite library; skipped" - - -prosqlite.o: $(srcdir)/c/prosqlite.c - $(CC) -c $(CFLAGS) $< -o $@ - -prosqlite.so: $(PROSQLITEOBJ) - $(LD) $(LDSOFLAGS) -o $@ $(AROBJ) -lsqlite3 -lgmp -lreadline -lncurses -lpthread -lresolv -lnss_dns -lnss_files -lcrypt -lstdc++ -lm -L/u/vitor/lib -ldl -lnsl $(LIBPLSO) - -install: $(TARGETS) $(addprefix $(srcdir)/, $(LIBPL)) install-examples - mkdir -p $(DESTDIR)$(SOLIBDIR) - for f in $(TARGETS); do \ - [ "$$f" = nolib ] || $(INSTALL_PROGRAM) $$f $(DESTDIR)$(SOLIBDIR); \ - done - mkdir -p $(DESTDIR)$(PLLIBDIR) - for f in $(LIBPL); do \ - $(INSTALL_DATA) $(srcdir)/$$f $(DESTDIR)$(PLLIBDIR); \ - done - $(MKINDEX) - -ln-install:: - @$(MAKE) INSTALL_DATA='../ln-install' INSTALL_PROGRAM='../ln-install' install - -rpm-install: install - -html-install:: - mkdir -p $(DESTDIR)$(PKGDOCDIR) - $(INSTALL) -m 644 $(DOC).html $(DESTDIR)$(PKGDOCDIR) - -pdf-install:: - mkdir -p $(DESTDIR)$(PKGDOCDIR) - $(INSTALL) -m 644 $(DOC).pdf $(DESTDIR)$(PKGDOCDIR) - -nnuninstall:: - (cd $(SOLIBDIR) && rm -f $(TARGETS)) - (cd $(PLLIBDIR) && rm -f $(LIBPL)) - $(PL) -f none -g make -t halt - -################################################################ -# Check -################################################################ - -check:: - - -################################################################ -# Clean -################################################################ - -clean: - rm -f $(AROBJ) *~ *.o *% a.out core config.log - -distclean: clean - rm -f $(TARGETS) config.cache config.h config.status Makefile - rm -f $(DOC).aux $(DOC).log $(DOC).out $(DOC).toc - rm -rf autom4te.cache diff --git a/pl/boot.yap b/pl/boot.yap index 4f3365c65..e5c43d958 100644 --- a/pl/boot.yap +++ b/pl/boot.yap @@ -329,7 +329,6 @@ true :- true. '$init_system' :- get_value('$yap_inited', true), !. '$init_system' :- - '$set_fpu_exceptions'(true), set_value('$yap_inited', true), % do catch as early as possible ( diff --git a/pl/errors.yap b/pl/errors.yap index b7db11e0c..0ae258530 100644 --- a/pl/errors.yap +++ b/pl/errors.yap @@ -248,7 +248,7 @@ to allow user-control. Level \= top, !, throw(error(permission_error(module,redefined,A),B)). '$process_error'(error(Msg, Where), _) :- !, - '$set_fpu_exceptions', + '$set_fpu_exceptions'(true), print_message(error,error(Msg, Where)). '$process_error'(Throw, _) :- print_message(error,error(unhandled_exception,Throw)). diff --git a/pl/flags.yap b/pl/flags.yap index 0507d3aaa..7656b33e8 100644 --- a/pl/flags.yap +++ b/pl/flags.yap @@ -1147,7 +1147,7 @@ yap_flag(max_threads,X) :- '$syntax_check_multiple'(_,off), '$swi_set_prolog_flag'(character_escapes, false), % disable character escapes. '$set_yap_flags'(14,1), -% '$set_fpu_exceptions'(false), + '$set_fpu_exceptions'(true), unknown(_,fail). '$adjust_language'(sicstus) :- '$switch_log_upd'(1), diff --git a/pl/threads.yap b/pl/threads.yap index f58267caf..c54c906fb 100644 --- a/pl/threads.yap +++ b/pl/threads.yap @@ -121,21 +121,12 @@ volatile(P) :- fail. '$init_thread0' :- '$no_threads', !. -'$init_thread0' :- - mutex_create(WMId), - assert_static(prolog:'$with_mutex_mutex'(WMId) ), - fail. '$init_thread0' :- recorda('$thread_defaults', [0, 0, 0, false, true], _). '$reinit_thread0' :- '$no_threads', !. -'$reinit_thread0' :- fail, - abolish(prolog:'$with_mutex_mutex'/1), - fail. -'$reinit_thread0' :- - mutex_create(WMId), - asserta_static( ( prolog:'$with_mutex_mutex'(WMId) :- !) ). +'$reinit_thread0'. '$top_thread_goal'(G, Detached) :- @@ -1096,15 +1087,14 @@ with_mutex(M, G) :- '$do_error'(type_error(callable,G),with_mutex(M, G)) ; atom(M) -> - '$with_mutex_mutex'(WMId), - '$lock_mutex'(WMId), + '$with_with_mutex'(WMId), ( recorded('$mutex_alias',[Id|M],_) -> true ; '$new_mutex'(Id), recorda('$mutex_alias',[Id|M],_) ), - '$unlock_mutex'(WMId), '$lock_mutex'(Id), + '$unlock_with_mutex'(WMId), ( catch('$execute'(G), E, ('$unlock_mutex'(Id), throw(E))) -> '$unlock_mutex'(Id) ; '$unlock_mutex'(Id), @@ -1176,13 +1166,19 @@ message_queue_create(Id, Options) :- var(Options), !, '$do_error'(instantiation_error, message_queue_create(Id, Options)). message_queue_create(Id, []) :- !, - '$do_msg_queue_create'(Id). + '$message_queue_create'(Id). message_queue_create(Id, [alias(Alias)]) :- var(Alias), !, '$do_error'(instantiation_error, message_queue_create(Id, [alias(Alias)])). message_queue_create(Id, [alias(Alias)]) :- \+ atom(Alias), !, '$do_error'(type_error(atom,Alias), message_queue_create(Id, [alias(Alias)])). +message_queue_create(Id, [alias(Alias)]) :- var(Id), !, + ( recorded('$thread_alias', [_|Alias], _) -> + '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Alias, [alias(Alias)])) + ; '$message_queue_create'(Id), + recordz('$thread_alias', [Id|Alias], _) + ). message_queue_create(Alias, [alias(Alias)]) :- !, ( recorded('$thread_alias', [_|Alias], _) -> '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Alias, [alias(Alias)])) @@ -1229,45 +1225,28 @@ message_queue_destroy(Name) :- fail. message_queue_destroy(_). -message_queue_property(Id, Prop) :- - ( nonvar(Id) -> - '$check_message_queue_or_alias'(Id, message_queue_property(Id, Prop)) - ; recorded('$queue', q(Id,_,_,_,_), _) - ), - '$check_message_queue_property'(Prop, message_queue_property(Id, Prop)), - '$message_queue_id_alias'(Id0, Id), - '$message_queue_property'(Id0, Prop). +/* @pred message_queue_property(+ _Queue_) -'$check_message_queue_or_alias'(Term, Goal) :- - var(Term), !, - '$do_error'(instantiation_error, Goal). -'$check_message_queue_or_alias'(Term, Goal) :- - \+ integer(Term), - \+ atom(Term), - Term \= '$message_queue'(_), !, - '$do_error'(domain_error(queue_or_alias, Term), Goal). -'$check_message_queue_or_alias'('$message_queue'(I), Goal) :- - \+ recorded('$queue', q(_,_,_,I,_), _), !, - '$do_error'(existence_error(queue, '$message_queue'(I)), Goal). -'$check_message_queue_or_alias'(Term, Goal) :- - \+ recorded('$queue', q(Term,_,_,_,_), _), !, - '$do_error'(existence_error(queue, Term), Goal). -'$check_message_queue_or_alias'(_, _). -'$message_queue_id_alias'(Id, Alias) :- - recorded('$queue', q(Alias,_,_,Id,_), _), !. -'$message_queue_id_alias'(Id, Id). +Report on the alias and number of messages stored in a queue created +with message_queue_create/1. -'$check_message_queue_property'(Term, _) :- - var(Term), !. -'$check_message_queue_property'(alias(_), _) :- !. -'$check_message_queue_property'(size(_), _) :- !. -'$check_message_queue_property'(max_size(_), _) :- !. -'$check_message_queue_property'(Term, Goal) :- - '$do_error'(domain_error(queue_property, Term), Goal). ++ `alias(Alias)` report the alias for stream _S_. It can also be used +to enumerate all message queues that have aliases, including anonymous +queues. + ++ `size(Size)` unifies _Size_ with the number of messages in the queue. +*/ + +message_queue_property( Id, alias(Alias) ) :- + recorded('$thread_alias',[Id|Alias],_). +message_queue_property( Alias, size(Size) ) :- + ground(Alias), + recorded('$thread_alias',[Id|Alias],_), + '$message_queue_size'(Id, Size). +message_queue_property( Id, size(Size) ) :- + '$message_queue_size'(Id, Size). -'$message_queue_property'(Id, alias(Alias)) :- - recorded('$queue', q(Alias,_,_,Id,_), _). /** @pred thread_send_message(+ _Term_) @@ -1415,7 +1394,7 @@ thread_peek_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_peek_message(Queue,Term)). thread_peek_message(Queue, Term) :- recorded('$thread_alias',[Id|Queue],_R), !, - '$message_peek_message'(Id, Term). + '$message_queue_peek'(Id, Term). tthread_peek_message(Queue, Term) :- '$message_queue_peek'(Queue, Term).