/************************************************************************* * * * YAP Prolog * * * * Yap Prolog was developed at NCCUP - Universidade do Porto * * * * Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 * * * ************************************************************************** * * * File: threads.yap * * Last rev: 8/2/88 * * mods: * * comments: support threads * * * *************************************************************************/ /** @ingroup Threads @{ */ :- system_module( '$_threads', [current_mutex/3, current_thread/2, message_queue_create/1, message_queue_create/2, message_queue_destroy/1, message_queue_property/2, mutex_create/1, mutex_create/2, mutex_destroy/1, mutex_lock/1, mutex_property/2, mutex_trylock/1, mutex_unlock/1, mutex_unlock_all/0, thread_at_exit/1, thread_cancel/1, thread_create/1, thread_create/2, thread_create/3, thread_default/1, thread_defaults/1, thread_detach/1, thread_exit/1, thread_get_message/1, thread_get_message/2, thread_join/2, (thread_local)/1, thread_peek_message/1, thread_peek_message/2, thread_property/1, thread_property/2, thread_self/1, thread_send_message/1, thread_send_message/2, thread_set_default/1, thread_set_defaults/1, thread_signal/2, thread_sleep/1, threads/0, (volatile)/1, with_mutex/2], ['$reinit_thread0'/0, '$thread_gfetch'/1, '$thread_local'/2]). :- use_system_module( '$_boot', ['$check_callable'/2, '$run_at_thread_start'/0, '$system_catch'/4]). :- use_system_module( '$_errors', ['$do_error'/2]). /** @pred thread_at_exit(: _Term_) Run _Goal_ just before releasing the thread resources. This is to be compared to `at_halt/1`, but only for the current thread. These hooks are ran regardless of why the execution of the thread has been completed. As these hooks are run, the return-code is already available through thread_property/2 using the result of thread_self/1 as thread-identifier. If you want to guarantee the execution of an exit hook no matter how the thread terminates (the thread can be aborted before reaching the thread_at_exit/1 call), consider using instead the `at_exit/1` option of thread_create/3. */ /** @pred thread_create(: _Goal_) Create a new Prolog detached thread using default options. See thread_create/3. */ /** @pred thread_create(: _Goal_, - _Id_) Create a new Prolog thread using default options. See thread_create/3. */ /** @pred thread_signal(+ _ThreadId_, : _Goal_) Make thread _ThreadId_ execute _Goal_ at the first opportunity. In the current implementation, this implies at the first pass through the Call-port. The predicate thread_signal/2 itself places _Goal_ into the signalled-thread's signal queue and returns immediately. Signals (interrupts) do not cooperate well with the world of multi-threading, mainly because the status of mutexes cannot be guaranteed easily. At the call-port, the Prolog virtual machine holds no locks and therefore the asynchronous execution is safe. _Goal_ can be any valid Prolog goal, including throw/1 to make the receiving thread generate an exception and trace/0 to start tracing the receiving thread. */ :- meta_predicate thread_initialization(0), thread_at_exit(0), thread_create(0, -, :), thread_create(0, -), thread_create(0), thread_signal(+, 0), with_mutex(+, 0), thread_signal(+,0), volatile(:). volatile(P) :- var(P), throw(error(instantiation_error,volatile(P))). volatile(M:P) :- '$do_volatile'(P,M). volatile((G1,G2)) :- '$current_module'(M), '$do_volatile'(G1,M), '$do_volatile'(G2,M). volatile(P) :- '$current_module'(M), '$do_volatile'(P,M). '$do_volatile'(P,M) :- dynamic(M:P). :- initialization('$init_thread0'). '$init_thread0' :- recorda('$thread_alias', [0|main], _), fail. '$init_thread0' :- '$no_threads', !. '$init_thread0' :- recorda('$thread_defaults', [0, 0, 0, false, true], _), '$create_thread_mq'(0), '$new_mutex'(Id), assert_static(prolog:'$with_mutex_mutex'(Id)). '$reinit_thread0' :- '$no_threads', !. '$reinit_thread0' :- '$create_thread_mq'(0), % abolish(prolog:'$with_mutex_mutex',1), '$new_mutex'(Id), asserta_static((prolog:'$with_mutex_mutex'(Id) :- !)). '$top_thread_goal'(G, Detached) :- '$thread_self'(Id), (Detached == true -> '$detach_thread'(Id) ; true), '$current_module'(Module), '$run_at_thread_start', % always finish with a throw to make sure we clean stacks. '$system_catch'((G -> throw('$thread_finished'(true)) ; throw('$thread_finished'(false))),Module,Exception,'$close_thread'(Exception,Detached)), % force backtracking and handling exceptions fail. '$close_thread'(Status, _Detached) :- '$thread_zombie_self'(Id0), !, '$record_thread_status'(Id0,Status), '$run_at_thread_exit'(Id0), '$erase_thread_info'(Id0). % OK, we want to ensure atomicity here in case we get an exception while we % are closing down the thread. '$record_thread_status'(Id0,Stat) :- !, '$mk_tstatus_key'(Id0, Key), (recorded(Key, _, R), erase(R), fail ; Stat = '$thread_finished'(Status) -> recorda(Key, Status, _) ; recorda(Key, exception(Stat), _) ). thread_create(Goal) :- G0 = thread_create(Goal), '$check_callable'(Goal, G0), '$thread_options'([detached(true)], [], Stack, Trail, System, Detached, AtExit, G0), '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here '$create_thread_mq'(Id), ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> true ; '$mk_tstatus_key'(Id, Key), recorda(Key, exception(resource_error(memory)),_) ). thread_create(Goal, Id) :- G0 = thread_create(Goal, Id), '$check_callable'(Goal, G0), ( nonvar(Id) -> '$do_error'(uninstantiation_error(Id),G0) ; true ), '$thread_options'([], [], Stack, Trail, System, Detached, AtExit, G0), '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here '$create_thread_mq'(Id), ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> true ; '$mk_tstatus_key'(Id, Key), recorda(Key, exception(resource_error(memory)),_) ). thread_create(Goal, Id, Options) :- G0 = thread_create(Goal, Id, Options), '$check_callable'(Goal,G0), ( nonvar(Id) -> '$do_error'(uninstantiation_error(Id),G0) ; true ), '$thread_options'(Options, Alias, Stack, Trail, System, Detached, AtExit, G0), '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here '$record_alias_info'(Id, Alias), '$create_thread_mq'(Id), ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> true ; '$mk_tstatus_key'(Id, Key), recorda(Key, exception(resource_error(memory)),_) ). '$erase_thread_info'(Id) :- recorded('$thread_alias',[Id|_],R), erase(R), fail. '$erase_thread_info'(Id) :- recorded('$thread_exit_hook', [Id|_], R), erase(R), fail. '$erase_thread_info'(Id) :- recorded('$queue',q(Id,_,_,_,QKey),_), '$empty_mqueue'(QKey), fail. '$erase_thread_info'(_). '$thread_options'(Opts, Alias, Stack, Trail, System, Detached, AtExit, G) :- strip_module(Opts, Mod, LOpts), ( var(Opts) -> '$do_error'(instantiation_error,G) ; var(Mod) -> '$do_error'(instantiation_error,G) ; \+ atom(Mod) -> '$do_error'(uninstantiation_error(Mod),G) ; var(LOpts) -> '$do_error'(instantiation_error,G) ; '$thread_options'(LOpts, Alias, Stack, Trail, System, Detached, AtExit, Mod, G) ). '$thread_options'([], _, Stack, Trail, System, Detached, AtExit, M, _) :- recorded('$thread_defaults', [DefaultStack, DefaultTrail, DefaultSystem, DefaultDetached, DefaultAtExit], _), ( var(Stack) -> Stack = DefaultStack; true ), ( var(Trail) -> Trail = DefaultTrail; true ), ( var(System) -> System = DefaultSystem; true ), ( var(Detached) -> Detached = DefaultDetached; true ), ( var(AtExit) -> AtExit = DefaultAtExit; true ). '$thread_options'([Opt|Opts], Alias, Stack, Trail, System, Detached, AtExit, M, G0) :- '$thread_option'(Opt, Alias, Stack, Trail, System, Detached, AtExit, M, G0), '$thread_options'(Opts, Alias, Stack, Trail, System, Detached, AtExit, M, G0). '$thread_option'(Option, _, _, _, _, _, _, _, G0) :- var(Option), !, '$do_error'(instantiation_error,G0). '$thread_option'(alias(Alias), Alias, _, _, _, _, _, _, G0) :- !, ( \+ atom(Alias) -> '$do_error'(type_error(atom,Alias),G0) ; true ). '$thread_option'(stack(Stack), _, Stack, _, _, _, _, _, G0) :- !, ( \+ integer(Stack) -> '$do_error'(type_error(integer,Stack),G0) ; true ). '$thread_option'(trail(Trail), _, _, Trail, _, _, _, _, G0) :- !, ( \+ integer(Trail) -> '$do_error'(type_error(integer,Trail),G0) ; true ). '$thread_option'(system(System), _, _, _, System, _, _, _, G0) :- !, ( \+ integer(System) -> '$do_error'(type_error(integer,System),G0) ; true ). '$thread_option'(detached(Detached), _, _, _, _, Detached, _, _, G0) :- !, ( Detached \== true, Detached \== false -> '$do_error'(domain_error(thread_option,Detached+[true,false]),G0) ; true ). '$thread_option'(at_exit(AtExit), _, _, _, _, _, AtExit, M, G0) :- !, ( \+ callable(AtExit) -> '$do_error'(type_error(callable,AtExit),G0) ; true ). % succeed silently, like SWI. '$thread_option'(Option, _, _, _, _, _, _, _, G0). % '$do_error'(domain_error(thread_option,Option),G0). '$record_alias_info'(_, Alias) :- var(Alias), !. '$record_alias_info'(_, Alias) :- recorded('$thread_alias', [_|Alias], _), !, '$do_error'(permission_error(create,thread,alias(Alias)), Goal). '$record_alias_info'(Id, Alias) :- recorda('$thread_alias', [Id|Alias], _). % vsc: ????? thread_defaults(Defaults) :- nonvar(Defaults), !, '$do_error'(uninstantiation_error(Defaults), thread_defaults(Defaults)). thread_defaults([stack(Stack), trail(Trail), system(System), detached(Detached), at_exit(AtExit)]) :- recorded('$thread_defaults',[Stack, Trail, System, Detached, AtExit], _). thread_default(Default) :- var(Default), !, recorded('$thread_defaults', Defaults, _), '$thread_default'(Default, Defaults). thread_default(stack(Stack)) :- !, recorded('$thread_defaults',[Stack, _, _, _, _], _). thread_default(trail(Trail)) :- !, recorded('$thread_defaults',[_, Trail, _, _, _], _). thread_default(system(System)) :- !, recorded('$thread_defaults',[_, _, System, _, _], _). thread_default(detached(Detached)) :- !, recorded('$thread_defaults',[_, _, _, Detached, _], _). thread_default(at_exit(AtExit)) :- !, recorded('$thread_defaults',[_, _, _, _, AtExit], _). thread_default(Default) :- '$do_error'(type_error(thread_option,Default),thread_default(Default)). '$thread_default'(stack(Stack), [Stack, _, _, _, _]). '$thread_default'(trail(Trail), [_, Trail, _, _, _]). '$thread_default'(stack(System), [_, _, System, _, _]). '$thread_default'(detached(Detached), [_, _, _, Detached, _]). '$thread_default'(at_exit(AtExit), [_, _, _, _, AtExit]). thread_set_defaults(V) :- var(V), !, '$do_error'(instantiation_error, thread_set_defaults(V)). thread_set_defaults([Default| Defaults]) :- !, '$thread_set_defaults'([Default| Defaults], thread_set_defaults([Default| Defaults])). thread_set_defaults(T) :- '$do_error'(type_error(list, T), thread_set_defaults(T)). '$thread_set_defaults'([], _). '$thread_set_defaults'([Default| Defaults], G) :- !, '$thread_set_default'(Default, G), '$thread_set_defaults'(Defaults, G). thread_set_default(V) :- var(V), !, '$do_error'(instantiation_error, thread_set_default(V)). thread_set_default(Default) :- '$thread_set_default'(Default, thread_set_default(Default)). '$thread_set_default'(stack(Stack), G) :- \+ integer(Stack), !, '$do_error'(type_error(integer, Stack), G). '$thread_set_default'(stack(Stack), G) :- Stack < 0, !, '$do_error'(domain_error(not_less_than_zero, Stack), G). '$thread_set_default'(stack(Stack), _) :- !, recorded('$thread_defaults', [_, Trail, System, Detached, AtExit], Ref), erase(Ref), recorda('$thread_defaults', [Stack, Trail, System, Detached, AtExit], _). '$thread_set_default'(trail(Trail), G) :- \+ integer(Trail), !, '$do_error'(type_error(integer, Trail), G). '$thread_set_default'(trail(Trail), G) :- Trail < 0, !, '$do_error'(domain_error(not_less_than_zero, Trail), G). '$thread_set_default'(trail(Trail), _) :- !, recorded('$thread_defaults', [Stack, _, System, Detached, AtExit], Ref), erase(Ref), recorda('$thread_defaults', [Stack, Trail, System, Detached, AtExit], _). '$thread_set_default'(system(System), G) :- \+ integer(System), !, '$do_error'(type_error(integer, System), G). '$thread_set_default'(system(System), G0) :- System < 0, !, '$do_error'(domain_error(not_less_than_zero, System), G0). '$thread_set_default'(system(System), _) :- !, recorded('$thread_defaults', [Stack, Trail, _, Detached, AtExit], Ref), erase(Ref), recorda('$thread_defaults', [Stack, Trail, System, Detached, AtExit], _). '$thread_set_default'(detached(Detached), G) :- Detached \== true, Detached \== false, !, '$do_error'(type_error(boolean, Detached), G). '$thread_set_default'(detached(Detached), _) :- !, recorded('$thread_defaults', [Stack, Trail, System, _, AtExit], Ref), erase(Ref), recorda('$thread_defaults', [Stack, Trail, System, Detached, AtExit], _). '$thread_set_default'(at_exit(AtExit), G) :- \+ callable(AtExit), !, '$do_error'(type_error(callable, AtExit), G). '$thread_set_default'(at_exit(AtExit), _) :- !, recorded('$thread_defaults', [Stack, Trail, System, Detached, _], Ref), erase(Ref), '$current_module'(M), recorda('$thread_defaults', [Stack, Trail, System, Detached, M:AtExit], _). '$thread_set_default'(Default, G) :- '$do_error'(domain_error(thread_default, Default), G). /** @pred thread_self(- _Id_) Get the Prolog thread identifier of the running thread. If the thread has an alias, the alias-name is returned. */ thread_self(Id) :- nonvar(Id), \+ integer(Id), \+ atom(Id), !, '$do_error'(domain_error(thread_or_alias, Id), thread_self(Id)). thread_self(Id) :- '$thread_self'(Id0), '$thread_id_alias'(Id0, Id). /* Exit status may be either true, false, exception(Term), or exited(Term) */ /** @pred thread_join(+ _Id_, - _Status_) Wait for the termination of thread with given _Id_. Then unify the result-status of the thread with _Status_. After this call, _Id_ becomes invalid and all resources associated with the thread are reclaimed. Note that threads with the attribute `detached` `true` cannot be joined. See also current_thread/2. A thread that has been completed without thread_join/2 being called on it is partly reclaimed: the Prolog stacks are released and the C-thread is destroyed. A small data-structure representing the exit-status of the thread is retained until thread_join/2 is called on the thread. Defined values for _Status_ are: + true The goal has been proven successfully. + false The goal has failed. + exception( _Term_) The thread is terminated on an exception. See print_message/2 to turn system exceptions into readable messages. + exited( _Term_) The thread is terminated on thread_exit/1 using the argument _Term_. + thread_detach(+ _Id_) Switch thread into detached-state (see `detached` option at thread_create/3 at runtime. _Id_ is the identifier of the thread placed in detached state. One of the possible applications is to simplify debugging. Threads that are created as `detached` leave no traces if they crash. For not-detached threads the status can be inspected using current_thread/2. Threads nobody is waiting for may be created normally and detach themselves just before completion. This way they leave no traces on normal completion and their reason for failure can be inspected. */ thread_join(Id, Status) :- nonvar(Status), !, '$do_error'(uninstantiation_error(Status),thread_join(Id, Status)). thread_join(Id, Status) :- '$check_thread_or_alias'(Id, thread_join(Id, Status)), '$thread_id_alias'(Id0, Id), '$thread_join'(Id0), '$mk_tstatus_key'(Id0, Key), recorded(Key, Status, R), erase(R), '$thread_destroy'(Id0). thread_cancel(Id) :- (Id == main; Id == 0), !, '$do_error'(permission_error(cancel, thread, main), thread_cancel(Id)). thread_cancel(Id) :- thread_signal(Id, throw(error(thread_cancel(Id),thread_cancel(Id)))). thread_detach(Id) :- '$check_thread_or_alias'(Id, thread_detach(Id)), '$thread_id_alias'(Id0, Id), '$detach_thread'(Id0), '$mk_tstatus_key'(Id0, Key), ( recorded(Key, _, _) -> '$erase_thread_info'(Id0), '$thread_destroy'(Id0) ; '$thread_unlock'(Id0) ). /** @pred thread_exit(+ _Term_) Terminates the thread immediately, leaving `exited( _Term_)` as result-state for thread_join/2. If the thread has the attribute `detached` `true` it terminates, but its exit status cannot be retrieved using thread_join/2 making the value of _Term_ irrelevant. The Prolog stacks and C-thread are reclaimed. */ thread_exit(Term) :- var(Term), !, '$do_error'(instantiation_error, thread_exit(Term)). thread_exit(Term) :- throw('$thread_finished'(exited(Term))). '$run_at_thread_exit'(Id0) :- '$thread_run_at_exit'(G, M), catch(once(M:G), _, fail), fail. '$run_at_thread_exit'(Id0) :- recorded('$thread_exit_hook',[Id0|Hook],R), erase(R), catch(once(Hook),_,fail), fail. '$run_at_thread_exit'(_). thread_at_exit(Goal) :- '$check_callable'(Goal,thread_at_exit(Goal)), '$thread_self'(Id0), recordz('$thread_exit_hook',[Id0|Goal],_). /** @pred current_thread(+ _Id_, - _Status_) Enumerates identifiers and status of all currently known threads. Calling current_thread/2 does not influence any thread. See also thread_join/2. For threads that have an alias-name, this name is returned in _Id_ instead of the numerical thread identifier. _Status_ is one of: + running The thread is running. This is the initial status of a thread. Please note that threads waiting for something are considered running too. + false The _Goal_ of the thread has been completed and failed. + true The _Goal_ of the thread has been completed and succeeded. + exited( _Term_) The _Goal_ of the thread has been terminated using thread_exit/1 with _Term_ as argument. If the underlying native thread has exited (using pthread_exit()) _Term_ is unbound. + exception( _Term_) The _Goal_ of the thread has been terminated due to an uncaught exception (see throw/1 and catch/3). */ current_thread(Id, Status) :- catch(thread_property(Id, status(Status)), error(existence_error(_,_),_), fail). '$thread_id_alias'(Id, Alias) :- recorded('$thread_alias', [Id|Alias], _), !. '$thread_id_alias'(Id, Id). '$mutex_id_alias'(Id, Alias) :- recorded('$mutex_alias', [Id|Alias], _), !. '$mutex_id_alias'(Id, Id). /** @pred mutex_create(? _MutexId_) Create a mutex. if _MutexId_ is an atom, a named mutex is created. If it is a variable, an anonymous mutex reference is returned. There is no limit to the number of mutexes that can be created. */ mutex_create(Mutex) :- ( atom(Mutex) -> mutex_create(_, [alias(Mutex)]) ; mutex_create(Mutex, []) ). mutex_create(Id, Options) :- nonvar(Id), !, '$do_error'(uninstantiation_error(Id), mutex_create(Id, Options)). mutex_create(Id, Options) :- Goal = mutex_create(Id, Options), '$mutex_options'(Options, Alias, Goal), ( atom(Alias) -> ( recorded('$mutex_alias',[_| Alias], _) -> '$do_error'(permission_error(create, mutex, Alias), Goal) ; '$new_mutex'(Id), recorda('$mutex_alias', [Id| Alias], _) ) ; '$new_mutex'(Id), recorda('$mutex_alias', [Id| Id], _) ). '$mutex_options'(Var, _, Goal) :- var(Var), !, '$do_error'(instantiation_error, Goal). '$mutex_options'([], _, _) :- !. '$mutex_options'([Option| Options], Alias, Goal) :- !, '$mutex_option'(Option, Alias, Goal), '$mutex_options'(Options, Alias, Goal). '$mutex_options'(Options, _, Goal) :- '$do_error'(type_error(list, Options), Goal). '$mutex_option'(Var, _, Goal) :- var(Var), !, '$do_error'(instantiation_error, Goal). '$mutex_option'(alias(Alias), Alias, Goal) :- !, ( atom(Alias) -> true ; '$do_error'(type_error(atom, Alias), Goal) ). '$mutex_option'(Option, _, Goal) :- '$do_error'(domain_error(mutex_option, Option), Goal). /* mutex_create(V) :- var(V), !, '$new_mutex'(V), recorda('$mutex_alias',[V|V],_). mutex_create(A) :- atom(A), recorded('$mutex_alias',[_|A],_), !, '$do_error'(permission_error(create,mutex,A),mutex_create(A)). mutex_create(A) :- atom(A), !, '$new_mutex'(Id), recorda('$mutex_alias',[Id|A],_). mutex_create(V) :- '$do_error'(type_error(atom,V),mutex_create(V)). */ /** @pred mutex_destroy(+ _MutexId_) Destroy a mutex. After this call, _MutexId_ becomes invalid and further references yield an `existence_error` exception. */ mutex_destroy(Mutex) :- '$check_mutex_or_alias'(Mutex, mutex_destroy(Mutex)), '$mutex_id_alias'(Id, Mutex), '$destroy_mutex'(Id), '$erase_mutex_info'(Id). '$erase_mutex_info'(Id) :- recorded('$mutex_alias',[Id|_],R), erase(R), fail. '$erase_mutex_info'(_). /** @pred mutex_lock(+ _MutexId_) Lock the mutex. Prolog mutexes are recursive mutexes: they can be locked multiple times by the same thread. Only after unlocking it as many times as it is locked, the mutex becomes available for locking by other threads. If another thread has locked the mutex the calling thread is suspended until to mutex is unlocked. If _MutexId_ is an atom, and there is no current mutex with that name, the mutex is created automatically using mutex_create/1. This implies named mutexes need not be declared explicitly. Please note that locking and unlocking mutexes should be paired carefully. Especially make sure to unlock mutexes even if the protected code fails or raises an exception. For most common cases use with_mutex/2, which provides a safer way for handling Prolog-level mutexes. */ mutex_lock(V) :- var(V), !, '$do_error'(instantiation_error,mutex_lock(V)). mutex_lock(A) :- recorded('$mutex_alias',[Id|A],_), !, '$lock_mutex'(Id). mutex_lock(A) :- atom(A), !, mutex_create(A), mutex_lock(A). mutex_lock(V) :- '$do_error'(type_error(atom,V),mutex_lock(V)). /** @pred mutex_trylock(+ _MutexId_) As mutex_lock/1, but if the mutex is held by another thread, this predicates fails immediately. */ mutex_trylock(V) :- var(V), !, '$do_error'(instantiation_error,mutex_trylock(V)). mutex_trylock(A) :- recorded('$mutex_alias',[Id|A],_), !, '$trylock_mutex'(Id). mutex_trylock(A) :- atom(A), !, mutex_create(A), mutex_trylock(A). mutex_trylock(V) :- '$do_error'(type_error(atom,V),mutex_trylock(V)). /** @pred mutex_unlock(+ _MutexId_) Unlock the mutex. This can only be called if the mutex is held by the calling thread. If this is not the case, a `permission_error` exception is raised. */ mutex_unlock(Mutex) :- '$check_mutex_or_alias'(Mutex, mutex_unlock(Mutex)), '$mutex_id_alias'(Id, Mutex), ( '$unlock_mutex'(Id) -> true ; '$do_error'(permission_error(unlock,mutex,Mutex),mutex_unlock(Mutex)) ). /** @pred mutex_unlock_all Unlock all mutexes held by the current thread. This call is especially useful to handle thread-termination using abort/0 or exceptions. See also thread_signal/2. */ mutex_unlock_all :- '$thread_self'(Tid), '$unlock_all_thread_mutexes'(Tid). '$unlock_all_thread_mutexes'(Tid) :- recorded('$mutex_alias',[Id|_],_), '$mutex_info'(Id, NRefs, Tid), NRefs > 0, '$mutex_unlock_all'(Id), fail. '$unlock_all_thread_mutexes'(_). '$mutex_unlock_all'(Id) :- '$mutex_info'(Id, NRefs, _), NRefs > 0, '$unlock_mutex'(Id), '$mutex_unlock_all'(Id). with_mutex(M, G) :- ( '$no_threads' -> once(G) ; var(M) -> '$do_error'(instantiation_error,with_mutex(M, G)) ; var(G) -> '$do_error'(instantiation_error,with_mutex(M, G)) ; \+ callable(G) -> '$do_error'(type_error(callable,G),with_mutex(M, G)) ; atom(M) -> '$with_mutex_mutex'(WMId), '$lock_mutex'(WMId), ( recorded('$mutex_alias',[Id|M],_) -> true ; '$new_mutex'(Id), recorda('$mutex_alias',[Id|M],_) ), '$unlock_mutex'(WMId), '$lock_mutex'(Id), ( catch('$execute'(G), E, ('$unlock_mutex'(Id), throw(E))) -> '$unlock_mutex'(Id) ; '$unlock_mutex'(Id), fail ) ; '$do_error'(type_error(atom,M),with_mutex(M, G)) ). /** @pred current_mutex(? _MutexId_, ? _ThreadId_, ? _Count_) Enumerates all existing mutexes. If the mutex is held by some thread, _ThreadId_ is unified with the identifier of the holding thread and _Count_ with the recursive count of the mutex. Otherwise, _ThreadId_ is `[]` and _Count_ is 0. */ current_mutex(M, T, NRefs) :- recorded('$mutex_alias',[Id|M],_), '$mutex_info'(Id, NRefs, T). mutex_property(Mutex, Prop) :- ( nonvar(Mutex) -> '$check_mutex_or_alias'(Mutex, mutex_property(Mutex, Prop)) ; recorded('$mutex_alias', [_|Mutex], _) ), '$check_mutex_property'(Prop, mutex_property(Mutex, Prop)), '$mutex_id_alias'(Id, Mutex), '$mutex_property'(Id, Prop). '$mutex_property'(Id, alias(Alias)) :- recorded('$mutex_alias', [Id|Alias], _), Id \= Alias. '$mutex_property'(Id, status(Status)) :- '$mutex_info'(Id, Count, HoldingThread), ( Count =:= 0 -> Status = unlocked ; % Count > 0, '$thread_id_alias'(HoldingThread, Alias), once((Thread = Alias; Thread = HoldingThread)), Status = locked(Thread, Count) ). message_queue_create(Id, Options) :- nonvar(Id), !, '$do_error'(uninstantiation_error(Id), message_queue_create(Id, Options)). message_queue_create(Id, Options) :- var(Options), !, '$do_error'(instantiation_error, message_queue_create(Id, Options)). message_queue_create(Id, []) :- !, '$do_msg_queue_create'(Id). message_queue_create(Id, [alias(Alias)]) :- var(Alias), !, '$do_error'(instantiation_error, message_queue_create(Id, [alias(Alias)])). message_queue_create(Id, [alias(Alias)]) :- \+ atom(Alias), !, '$do_error'(type_error(atom,Alias), message_queue_create(Id, [alias(Alias)])). message_queue_create(Id, [alias(Alias)]) :- !, '$new_mutex'(Mutex), '$cond_create'(Cond), ( recorded('$queue', q(Alias,_,_,_,_), _) -> '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Id, [alias(Alias)])) ; recorded('$thread_alias', [_|Alias], _) -> '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Id, [alias(Alias)])) ; '$mq_new_id'(Id, NId, Key), recorda('$queue',q(Alias,Mutex,Cond,NId,Key), _) ). message_queue_create(Id, [Option| _]) :- '$do_error'(domain_error(queue_option, Option), message_queue_create(Id, [Option| _])). message_queue_create(Id, Options) :- '$do_error'(type_error(list, Options), message_queue_create(Id, Options)). /** @pred message_queue_create(? _Queue_) If _Queue_ is an atom, create a named queue. To avoid ambiguity on `thread_send_message/2`, the name of a queue may not be in use as a thread-name. If _Queue_ is unbound an anonymous queue is created and _Queue_ is unified to its identifier. */ message_queue_create(Id) :- ( var(Id) -> % ISO DTR message_queue_create(Id, []) ; atom(Id) -> % old behavior message_queue_create(_, [alias(Id)]) ; '$do_error'(uninstantiation_error(Id), message_queue_create(Id)) ). '$do_msg_queue_create'(Id) :- \+ recorded('$queue',q(Id,_,_,_,_), _), '$new_mutex'(Mutex), '$cond_create'(Cond), '$mq_new_id'(Id, NId, Key), recorda('$queue',q(Id,Mutex,Cond,NId,Key), _), fail. '$do_msg_queue_create'(_). '$create_thread_mq'(TId) :- recorded('$queue',q(TId,_,_,_,_), R), erase(R), fail. '$create_thread_mq'(TId) :- \+ recorded('$queue',q(TId,_,_,_,_), _), '$new_mutex'(Mutex), '$cond_create'(Cond), '$mq_new_id'(TId, TId, Key), recorda('$queue', q(TId,Mutex,Cond,TId,Key), _), fail. % recover space '$create_thread_mq'(_). '$mq_new_id'(Id, Id, AtId) :- integer(Id), !, \+ recorded('$queue', q(_,_,_,Id,_), _), '$init_db_queue'(AtId). '$mq_new_id'(_, Id, AtId) :- '$integers'(Id), \+ recorded('$queue', q(_,_,_,Id,_), _), !, '$init_db_queue'(AtId). '$integers'(-1). '$integers'(I) :- '$integers'(I1), I is I1-1. /** @pred message_queue_destroy(+ _Queue_) Destroy a message queue created with message_queue_create/1. It is not allows to destroy the queue of a thread. Neither is it allowed to destroy a queue other threads are waiting for or, for anonymous message queues, may try to wait for later. */ message_queue_destroy(Name) :- var(Name), !, '$do_error'(instantiation_error,message_queue_destroy(Name)). message_queue_destroy(Name) :- '$message_queue_destroy'(Name), fail. message_queue_destroy(_). '$message_queue_destroy'(Queue) :- recorded('$queue',q(Queue,Mutex,Cond,_,QKey),R), !, '$clean_mqueue'(QKey), '$cond_destroy'(Cond), '$destroy_mutex'(Mutex), erase(R). '$message_queue_destroy'(Queue) :- atomic(Queue), !, '$do_error'(existence_error(message_queue,Queue),message_queue_destroy(Queue)). '$message_queue_destroy'(Name) :- '$do_error'(type_error(atom,Name),message_queue_destroy(Name)). '$clean_mqueue'(Queue) :- '$db_dequeue'(Queue, _), fail. '$clean_mqueue'(_). '$empty_mqueue'(Queue) :- '$db_dequeue_unlocked'(Queue, _), fail. '$empty_mqueue'(_). message_queue_property(Id, Prop) :- ( nonvar(Id) -> '$check_message_queue_or_alias'(Id, message_queue_property(Id, Prop)) ; recorded('$queue', q(Id,_,_,_,_), _) ), '$check_message_queue_property'(Prop, message_queue_property(Id, Prop)), '$message_queue_id_alias'(Id0, Id), '$message_queue_property'(Id0, Prop). '$check_message_queue_or_alias'(Term, Goal) :- var(Term), !, '$do_error'(instantiation_error, Goal). '$check_message_queue_or_alias'(Term, Goal) :- \+ integer(Term), \+ atom(Term), Term \= '$message_queue'(_), !, '$do_error'(domain_error(queue_or_alias, Term), Goal). '$check_message_queue_or_alias'('$message_queue'(I), Goal) :- \+ recorded('$queue', q(_,_,_,I,_), _), !, '$do_error'(existence_error(queue, '$message_queue'(I)), Goal). '$check_message_queue_or_alias'(Term, Goal) :- \+ recorded('$queue', q(Term,_,_,_,_), _), !, '$do_error'(existence_error(queue, Term), Goal). '$check_message_queue_or_alias'(_, _). '$message_queue_id_alias'(Id, Alias) :- recorded('$queue', q(Alias,_,_,Id,_), _), !. '$message_queue_id_alias'(Id, Id). '$check_message_queue_property'(Term, _) :- var(Term), !. '$check_message_queue_property'(alias(_), _) :- !. '$check_message_queue_property'(size(_), _) :- !. '$check_message_queue_property'(max_size(_), _) :- !. '$check_message_queue_property'(Term, Goal) :- '$do_error'(domain_error(queue_property, Term), Goal). '$message_queue_property'(Id, alias(Alias)) :- recorded('$queue', q(Alias,_,_,Id,_), _). thread_send_message(Term) :- '$thread_self'(Id), thread_send_message(Id, Term). /** @pred thread_send_message(+ _QueueOrThreadId_, + _Term_) Place _Term_ in the given queue or default queue of the indicated thread (which can even be the message queue of itself (see thread_self/1). Any term can be placed in a message queue, but note that the term is copied to the receiving thread and variable-bindings are thus lost. This call returns immediately. If more than one thread is waiting for messages on the given queue and at least one of these is waiting with a partially instantiated _Term_, the waiting threads are all sent a wakeup signal, starting a rush for the available messages in the queue. This behaviour can seriously harm performance with many threads waiting on the same queue as all-but-the-winner perform a useless scan of the queue. If there is only one waiting thread or all waiting threads wait with an unbound variable an arbitrary thread is restarted to scan the queue. */ thread_send_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_send_message(Queue,Term)). thread_send_message(Queue, Term) :- recorded('$thread_alias',[Id|Queue],_), !, thread_send_message(Id, Term). thread_send_message(Queue, Term) :- '$do_thread_send_message'(Queue, Term), fail. % release pointers thread_send_message(_, _). '$do_thread_send_message'(Queue, Term) :- recorded('$queue',q(Queue,Mutex,Cond,_,Key),_), !, '$lock_mutex'(Mutex), '$db_enqueue_unlocked'(Key, Term), % write(+Queue:Term),nl, '$cond_signal'(Cond), '$unlock_mutex'(Mutex). '$do_thread_send_message'(Queue, Term) :- '$do_error'(existence_error(queue,Queue),thread_send_message(Queue,Term)). /** @pred thread_get_message(? _Term_) Examines the thread message-queue and if necessary blocks execution until a term that unifies to _Term_ arrives in the queue. After a term from the queue has been unified unified to _Term_, the term is deleted from the queue and this predicate returns. Please note that not-unifying messages remain in the queue. After the following has been executed, thread 1 has the term `gnu` in its queue and continues execution using _A_ is `gnat`. ~~~~~ thread_get_message(a(A)), thread_send_message(b(gnu)), thread_send_message(a(gnat)), ~~~~~ See also thread_peek_message/1. */ thread_get_message(Term) :- '$thread_self'(Id), thread_get_message(Id, Term). /** @pred thread_get_message(+ _Queue_, ? _Term_) As thread_get_message/1, operating on a given queue. It is allowed to peek into another thread's message queue, an operation that can be used to check whether a thread has swallowed a message sent to it. */ thread_get_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_get_message(Queue,Term)). thread_get_message(Queue, Term) :- recorded('$thread_alias',[Id|Queue],_), !, thread_get_message(Id, Term). thread_get_message(Queue, Term) :- recorded('$queue',q(Queue,Mutex,Cond,_,Key),_), !, '$lock_mutex'(Mutex), % write(-Queue:Term),nl, '$thread_get_message_loop'(Key, Term, Mutex, Cond). thread_get_message(Queue, Term) :- '$do_error'(existence_error(message_queue,Queue),thread_get_message(Queue,Term)). '$thread_get_message_loop'(Key, Term, Mutex, _) :- '$db_dequeue_unlocked'(Key, Term), !, '$unlock_mutex'(Mutex). '$thread_get_message_loop'(Key, Term, Mutex, Cond) :- '$cond_wait'(Cond, Mutex), '$thread_get_message_loop'(Key, Term, Mutex, Cond). /** @pred thread_peek_message(? _Term_) Examines the thread message-queue and compares the queued terms with _Term_ until one unifies or the end of the queue has been reached. In the first case the call succeeds (possibly instantiating _Term_. If no term from the queue unifies this call fails. */ thread_peek_message(Term) :- '$thread_self'(Id), thread_peek_message(Id, Term). /** @pred thread_peek_message(+ _Queue_, ? _Term_) As thread_peek_message/1, operating on a given queue. It is allowed to peek into another thread's message queue, an operation that can be used to check whether a thread has swallowed a message sent to it. Explicit message queues are designed with the worker-pool model in mind, where multiple threads wait on a single queue and pick up the first goal to execute. Below is a simple implementation where the workers execute arbitrary Prolog goals. Note that this example provides no means to tell when all work is done. This must be realised using additional synchronisation. ~~~~~ % create_workers(+Id, +N) % % Create a pool with given Id and number of workers. create_workers(Id, N) :- message_queue_create(Id), forall(between(1, N, _), thread_create(do_work(Id), _, [])). do_work(Id) :- repeat, thread_get_message(Id, Goal), ( catch(Goal, E, print_message(error, E)) -> true ; print_message(error, goal_failed(Goal, worker(Id))) ), fail. % work(+Id, +Goal) % % Post work to be done by the pool work(Id, Goal) :- thread_send_message(Id, Goal). ~~~~~ */ thread_peek_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_peek_message(Queue,Term)). thread_peek_message(Queue, Term) :- recorded('$thread_alias',[Id|Queue],_), !, thread_peek_message(Id, Term). thread_peek_message(Queue, Term) :- recorded('$queue',q(Queue,Mutex,_,_,Key),_), !, '$lock_mutex'(Mutex), '$thread_peek_message2'(Key, Term, Mutex). thread_peek_message(Queue, Term) :- '$do_error'(existence_error(message_queue,Queue),thread_peek_message(Queue,Term)). '$thread_peek_message2'(Key, Term, Mutex) :- '$db_peek_queue'(Key, Term), !, '$unlock_mutex'(Mutex). '$thread_peek_message2'(_, _, Mutex) :- '$unlock_mutex'(Mutex), fail. /** @pred thread_local( _+Functor/Arity_) related to the dynamic/1 directive. It tells the system that the predicate may be modified using assert/1, retract/1, etc, during execution of the program. Unlike normal shared dynamic data however each thread has its own clause-list for the predicate. As a thread starts, this clause list is empty. If there are still clauses as the thread terminates these are automatically reclaimed by the system. The `thread_local` property implies the property `dynamic`. Thread-local dynamic predicates are intended for maintaining thread-specific state or intermediate results of a computation. It is not recommended to put clauses for a thread-local predicate into a file as in the example below as the clause is only visible from the thread that loaded the source-file. All other threads start with an empty clause-list. ~~~~~ :- thread_local foo/1. foo(gnat). ~~~~~ */ thread_local(X) :- '$current_module'(M), '$thread_local'(X,M). '$thread_local'(X,M) :- var(X), !, '$do_error'(instantiation_error,thread_local(M:X)). '$thread_local'(Mod:Spec,_) :- !, '$thread_local'(Spec,Mod). '$thread_local'([], _) :- !. '$thread_local'([H|L], M) :- !, '$thread_local'(H, M), '$thread_local'(L, M). '$thread_local'((A,B),M) :- !, '$thread_local'(A,M), '$thread_local'(B,M). '$thread_local'(X,M) :- !, '$thread_local2'(X,M). '$thread_local2'(A/N, Mod) :- integer(N), atom(A), !, functor(T,A,N), (Mod \= idb -> '$flags'(T,Mod,F,F) ; true), ( '$install_thread_local'(T,Mod) -> true ; F /\ 0x08002000 =\= 0 -> '$do_error'(permission_error(modify,dynamic_procedure,A/N),thread_local(Mod:A/N)) ; '$do_error'(permission_error(modify,static_procedure,A/N),thread_local(Mod:A/N)) ). '$thread_local2'(X,Mod) :- '$do_error'(type_error(callable,X),thread_local(Mod:X)). /** @pred thread_sleep(+ _Time_) Make current thread sleep for _Time_ seconds. _Time_ may be an integer or a floating point number. When time is zero or a negative value the call succeeds and returns immediately. This call should not be used if alarms are also being used. */ thread_sleep(Time) :- var(Time), !, '$do_error'(instantiation_error,thread_sleep(Time)). thread_sleep(Time) :- integer(Time), !, ( Time > 0 -> '$thread_sleep'(Time,0,_,_) ; true ). thread_sleep(Time) :- float(Time), !, ( Time > 0.0 -> STime is integer(float_integer_part(Time)), NTime is integer(float_fractional_part(Time))*1000000000, '$thread_sleep'(STime,NTime,_,_) ; true ). thread_sleep(Time) :- '$do_error'(type_error(number,Time),thread_sleep(Time)). thread_signal(Id, Goal) :- '$check_thread_or_alias'(Id, thread_signal(Id, Goal)), '$check_callable'(Goal, thread_signal(Id, Goal)), '$thread_id_alias'(Id0, Id), ( recorded('$thread_signal', [Id0| _], R), erase(R), fail ; true ), recorda('$thread_signal', [Id0| Goal], _), '$signal_thread'(Id0). '$thread_gfetch'(G) :- '$thread_self'(Id), recorded('$thread_signal',[Id|G],R), erase(R). thread_property(Prop) :- '$check_thread_property'(Prop, thread_property(Prop)), '$thread_self'(Id), '$thread_property'(Prop, Id). /** @pred thread_property(? _Id_, ? _Property_) Enumerates the properties of the specified thread. Calling thread_property/2 does not influence any thread. See also thread_join/2. For threads that have an alias-name, this name can be used in _Id_ instead of the numerical thread identifier. _Property_ is one of: + status( _Status_) The thread status of a thread (see below). + alias( _Alias_) The thread alias, if it exists. + at_exit( _AtExit_) The thread exit hook, if defined (not available if the thread is already terminated). + detached( _Boolean_) The detached state of the thread. + stack( _Size_) The thread stack data-area size. + trail( _Size_) The thread trail data-area size. + system( _Size_) The thread system data-area size. */ thread_property(Id, Prop) :- ( nonvar(Id) -> '$check_thread_or_alias'(Id, thread_property(Id, Prop)) ; '$enumerate_threads'(Id) ), '$check_thread_property'(Prop, thread_property(Id, Prop)), '$thread_id_alias'(Id0, Id), '$thread_property'(Prop, Id0). '$enumerate_threads'(Id) :- '$max_threads'(Max), Max1 is Max-1, between(0,Max1,Id), '$thread_stacks'(Id, _, _, _). '$thread_property'(alias(Alias), Id) :- recorded('$thread_alias', [Id|Alias], _). '$thread_property'(status(Status), Id) :- '$mk_tstatus_key'(Id, Key), ( recorded(Key, Exit, _) -> Status = Exit ; Status = running ). '$thread_property'(detached(Detached), Id) :- ( '$thread_detached'(Id,Detached) -> true ; Detached = false ). '$thread_property'(at_exit(M:G), Id) :- '$thread_run_at_exit'(G,M). '$thread_property'(stack(Stack), Id) :- '$thread_stacks'(Id, Stack, _, _). '$thread_property'(trail(Trail), Id) :- '$thread_stacks'(Id, _, Trail, _). '$thread_property'(system(System), Id) :- '$thread_stacks'(Id, _, _, System). threads :- format(user_error,'------------------------------------------------------------------------~n',[]), format(user_error, '~t~a~48+~n', 'Thread Detached Status'), format(user_error,'------------------------------------------------------------------------~n',[]), thread_property(Id, detached(Detached)), thread_property(Id, status(Status)), '$thread_id_alias'(Id, Alias), format(user_error,'~t~q~30+~33|~w~42|~q~n', [Alias, Detached, Status]), fail. threads :- format(user_error,'------------------------------------------------------------------------~n',[]). '$check_thread_or_alias'(Term, Goal) :- var(Term), !, '$do_error'(instantiation_error, Goal). '$check_thread_or_alias'(Term, Goal) :- \+ integer(Term), \+ atom(Term), !, '$do_error'(domain_error(thread_or_alias, Term), Goal). '$check_thread_or_alias'(Term, Goal) :- atom(Term), \+ recorded('$thread_alias',[_|Term],_), !, '$do_error'(existence_error(thread, Term), Goal). '$check_thread_or_alias'(Term, Goal) :- integer(Term), \+ '$valid_thread'(Term), !, '$do_error'(existence_error(thread, Term), Goal). '$check_thread_or_alias'(_,_). '$check_thread_property'(Term, _) :- var(Term), !. '$check_thread_property'(alias(_), _) :- !. '$check_thread_property'(detached(_), _) :- !. '$check_thread_property'(at_exit(_), _) :- !. '$check_thread_property'(status(_), _) :- !. '$check_thread_property'(stack(_), _) :- !. '$check_thread_property'(trail(_), _) :- !. '$check_thread_property'(system(_), _) :- !. '$check_thread_property'(Term, Goal) :- '$do_error'(domain_error(thread_property, Term), Goal). '$check_mutex_or_alias'(Term, Goal) :- var(Term), !, '$do_error'(instantiation_error, Goal). '$check_mutex_or_alias'(Term, Goal) :- \+ integer(Term), \+ atom(Term), !, '$do_error'(domain_error(mutex_or_alias, Term), Goal). '$check_mutex_or_alias'(Term, Goal) :- atom(Term), \+ recorded('$mutex_alias',[_|Term],_), !, '$do_error'(existence_error(mutex, Term), Goal). '$check_mutex_or_alias'(Term, Goal) :- % integer(Term), \+ '$valid_mutex'(Term), !, integer(Term), \+ recorded('$mutex_alias',[Term|_],_), !, '$do_error'(existence_error(mutex, Term), Goal). '$check_mutex_or_alias'(_,_). '$check_mutex_property'(Term, _) :- var(Term), !. '$check_mutex_property'(alias(_), _) :- !. '$check_mutex_property'(status(Status), Goal) :- !, ( var(Status) -> true ; Status = unlocked -> true ; Status = locked(_, _) -> true ; '$do_error'(domain_error(mutex_property, status(Status)), Goal) ). '$check_mutex_property'(Term, Goal) :- '$do_error'(domain_error(mutex_property, Term), Goal). '$mk_tstatus_key'(Id0, Key) :- atomic_concat('$thread_exit_status__',Id0,Key). /** @pred thread_statistics(+ _Id_, + _Key_, - _Value_) Obtains statistical information on thread _Id_ as `statistics/2` does in single-threaded applications. This call returns all keys of `statistics/2`, although only information statistics about the stacks and CPU time yield different values for each thread. + mutex_statistics Print usage statistics on internal mutexes and mutexes associated with dynamic predicates. For each mutex two numbers are printed: the number of times the mutex was acquired and the number of collisions: the number times the calling thread has to wait for the mutex. The collision-count is not available on Windows as this would break portability to Windows-95/98/ME or significantly harm performance. Generally collision count is close to zero on single-CPU hardware. + threads Prints a table of current threads and their status. */ thread_statistics(Id, Key, Val) :- format("not implemented yet~n",[]). /** @} */