/************************************************************************* * * * YAP Prolog * * * * Yap Prolog was developed at NCCUP - Universidade do Porto * * * * Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 * * * ************************************************************************** * * * File: threads.yap * * Last rev: 8/2/88 * * mods: * * comments: support threads * * * *************************************************************************/ /** @defgroup Threads Threads @ingroup extensions @{ YAP implements a SWI-Prolog compatible multithreading library. Like in SWI-Prolog, Prolog threads have their own stacks and only share the Prolog <em>heap</em>: predicates, records, flags and other global non-backtrackable data. The package is based on the POSIX thread standard (Butenhof:1997:PPT) used on most popular systems except for MS-Windows. */ :- system_module( '$_threads', [current_mutex/3, current_thread/2, message_queue_create/1, message_queue_create/2, message_queue_destroy/1, message_queue_property/2, mutex_create/1, mutex_create/2, mutex_destroy/1, mutex_lock/1, mutex_property/2, mutex_trylock/1, mutex_unlock/1, mutex_unlock_all/0, thread_at_exit/1, thread_cancel/1, thread_create/1, thread_create/2, thread_create/3, thread_default/1, thread_defaults/1, thread_detach/1, thread_exit/1, thread_get_message/1, thread_get_message/2, thread_join/2, (thread_local)/1, thread_peek_message/1, thread_peek_message/2, thread_property/1, thread_property/2, thread_self/1, thread_send_message/1, thread_send_message/2, thread_set_default/1, thread_set_defaults/1, thread_signal/2, thread_sleep/1, threads/0, (volatile)/1, with_mutex/2], ['$reinit_thread0'/0, '$thread_gfetch'/1, '$thread_local'/2]). :- use_system_module( '$_boot', ['$check_callable'/2, '$run_at_thread_start'/0, '$system_catch'/4]). :- use_system_module( '$_errors', ['$do_error'/2]). :- meta_predicate thread_initialization(0), thread_at_exit(0), thread_create(0, -, :), thread_create(0, -), thread_create(0), thread_signal(+, 0), with_mutex(+, 0), thread_signal(+,0), volatile(:). volatile(P) :- var(P), throw(error(instantiation_error,volatile(P))). volatile(M:P) :- '$do_volatile'(P,M). volatile((G1,G2)) :- '$current_module'(M), '$do_volatile'(G1,M), '$do_volatile'(G2,M). volatile(P) :- '$current_module'(M), '$do_volatile'(P,M). '$do_volatile'(P,M) :- dynamic(M:P). /** @defgroup Creating_and_Destroying_Prolog_Threads Creating and Destroying Prolog Threads @ingroup Threads @{ */ :- initialization('$init_thread0'). '$init_thread0' :- recorda('$thread_alias', [0|main], _), fail. '$init_thread0' :- '$no_threads', !. '$init_thread0' :- recorda('$thread_defaults', [0, 0, 0, false, true], _). '$reinit_thread0' :- '$no_threads', !. '$reinit_thread0'. '$top_thread_goal'(G, Detached) :- '$thread_self'(Id), (Detached == true -> '$detach_thread'(Id) ; true), '$current_module'(Module), '$run_at_thread_start', % always finish with a throw to make sure we clean stacks. '$system_catch'((G -> throw('$thread_finished'(true)) ; throw('$thread_finished'(false))),Module,Exception,'$close_thread'(Exception,Detached)), % force backtracking and handling exceptions fail. '$close_thread'(Status, _Detached) :- '$thread_zombie_self'(Id0), !, '$record_thread_status'(Id0,Status), '$run_at_thread_exit'(Id0), '$erase_thread_info'(Id0). % OK, we want to ensure atomicity here in case we get an exception while we % are closing down the thread. '$record_thread_status'(Id0,Stat) :- !, '$mk_tstatus_key'(Id0, Key), (recorded(Key, _, R), erase(R), fail ; Stat = '$thread_finished'(Status) -> recorda(Key, Status, _) ; recorda(Key, exception(Stat), _) ). /** @pred thread_create(: _Goal_) Create a new Prolog detached thread using default options. See thread_create/3. */ thread_create(Goal) :- G0 = thread_create(Goal), '$check_callable'(Goal, G0), '$thread_options'([detached(true)], [], Stack, Trail, System, Detached, AtExit, G0), '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> true ; '$mk_tstatus_key'(Id, Key), recorda(Key, exception(resource_error(memory)),_) ). /** @pred thread_create(: _Goal_, - _Id_) Create a new Prolog thread using default options. See thread_create/3. */ thread_create(Goal, Id) :- G0 = thread_create(Goal, Id), '$check_callable'(Goal, G0), ( nonvar(Id) -> '$do_error'(uninstantiation_error(Id),G0) ; true ), '$thread_options'([], [], Stack, Trail, System, Detached, AtExit, G0), '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> true ; '$mk_tstatus_key'(Id, Key), recorda(Key, exception(resource_error(memory)),_) ). /** @pred thread_create(: _Goal_, - _Id_, + _Options_) Create a new Prolog thread (and underlying C-thread) and start it by executing _Goal_. If the thread is created successfully, the thread-identifier of the created thread is unified to _Id_. _Options_ is a list of options. Currently defined options are: + stack Set the limit in K-Bytes to which the Prolog stacks of this thread may grow. If omitted, the limit of the calling thread is used. See also the commandline `-S` option. + trail Set the limit in K-Bytes to which the trail stack of this thread may grow. If omitted, the limit of the calling thread is used. See also the commandline option `-T`. + alias Associate an alias-name with the thread. This named may be used to refer to the thread and remains valid until the thread is joined (see thread_join/2). + at_exit Define an exit hook for the thread. This hook is called when the thread terminates, no matter its exit status. + detached If `false` (default), the thread can be waited for using thread_join/2. thread_join/2 must be called on this thread to reclaim the all resources associated to the thread. If `true`, the system will reclaim all associated resources automatically after the thread finishes. Please note that thread identifiers are freed for reuse after a detached thread finishes or a normal thread has been joined. See also thread_join/2 and thread_detach/1. The _Goal_ argument is <em>copied</em> to the new Prolog engine. This implies further instantiation of this term in either thread does not have consequences for the other thread: Prolog threads do not share data from their stacks. */ thread_create(Goal, Id, Options) :- G0 = thread_create(Goal, Id, Options), '$check_callable'(Goal,G0), ( nonvar(Id) -> '$do_error'(uninstantiation_error(Id),G0) ; true ), '$thread_options'(Options, Alias, Stack, Trail, System, Detached, AtExit, G0), '$thread_new_tid'(Id), % '$erase_thread_info'(Id), % this should not be here '$record_alias_info'(Id, Alias), ( '$create_thread'(Goal, Stack, Trail, System, Detached, AtExit, Id) -> true ; '$mk_tstatus_key'(Id, Key), recorda(Key, exception(resource_error(memory)),_) ). '$erase_thread_info'(Id) :- recorded('$thread_alias',[Id|_],R), erase(R), fail. '$erase_thread_info'(Id) :- recorded('$thread_exit_hook', [Id|_], R), erase(R), fail. '$erase_thread_info'(_). '$thread_options'(Opts, Alias, Stack, Trail, System, Detached, AtExit, G) :- strip_module(Opts, Mod, LOpts), ( var(Opts) -> '$do_error'(instantiation_error,G) ; var(Mod) -> '$do_error'(instantiation_error,G) ; \+ atom(Mod) -> '$do_error'(uninstantiation_error(Mod),G) ; var(LOpts) -> '$do_error'(instantiation_error,G) ; '$thread_options'(LOpts, Alias, Stack, Trail, System, Detached, AtExit, Mod, G) ). '$thread_options'([], _, Stack, Trail, System, Detached, AtExit, _M, _) :- recorded('$thread_defaults', [DefaultStack, DefaultTrail, DefaultSystem, DefaultDetached, DefaultAtExit], _), ( var(Stack) -> Stack = DefaultStack; true ), ( var(Trail) -> Trail = DefaultTrail; true ), ( var(System) -> System = DefaultSystem; true ), ( var(Detached) -> Detached = DefaultDetached; true ), ( var(AtExit) -> AtExit = DefaultAtExit; true ). '$thread_options'([Opt|Opts], Alias, Stack, Trail, System, Detached, AtExit, M, G0) :- '$thread_option'(Opt, Alias, Stack, Trail, System, Detached, AtExit, M, G0), '$thread_options'(Opts, Alias, Stack, Trail, System, Detached, AtExit, M, G0). '$thread_option'(Option, _, _, _, _, _, _, _, G0) :- var(Option), !, '$do_error'(instantiation_error,G0). '$thread_option'(alias(Alias), Alias, _, _, _, _, _, _, G0) :- !, ( \+ atom(Alias) -> '$do_error'(type_error(atom,Alias),G0) ; true ). '$thread_option'(stack(Stack), _, Stack, _, _, _, _, _, G0) :- !, ( \+ integer(Stack) -> '$do_error'(type_error(integer,Stack),G0) ; true ). '$thread_option'(trail(Trail), _, _, Trail, _, _, _, _, G0) :- !, ( \+ integer(Trail) -> '$do_error'(type_error(integer,Trail),G0) ; true ). '$thread_option'(system(System), _, _, _, System, _, _, _, G0) :- !, ( \+ integer(System) -> '$do_error'(type_error(integer,System),G0) ; true ). '$thread_option'(detached(Detached), _, _, _, _, Detached, _, _, G0) :- !, ( Detached \== true, Detached \== false -> '$do_error'(domain_error(thread_option,Detached+[true,false]),G0) ; true ). '$thread_option'(at_exit(AtExit), _, _, _, _, _, AtExit, _M, G0) :- !, ( \+ callable(AtExit) -> '$do_error'(type_error(callable,AtExit),G0) ; true ). % succeed silently, like SWI. '$thread_option'(_Option, _, _, _, _, _, _, _, _G0). % '$do_error'(domain_error(thread_option,Option),G0). '$record_alias_info'(_, Alias) :- var(Alias), !. '$record_alias_info'(_, Alias) :- recorded('$thread_alias', [_|Alias], _), !, '$do_error'(permission_error(create,thread,alias(Alias)), create_thread). '$record_alias_info'(Id, Alias) :- recorda('$thread_alias', [Id|Alias], _). % vsc: ????? thread_defaults(Defaults) :- nonvar(Defaults), !, '$do_error'(uninstantiation_error(Defaults), thread_defaults(Defaults)). thread_defaults([stack(Stack), trail(Trail), system(System), detached(Detached), at_exit(AtExit)]) :- recorded('$thread_defaults',[Stack, Trail, System, Detached, AtExit], _). thread_default(Default) :- var(Default), !, recorded('$thread_defaults', Defaults, _), '$thread_default'(Default, Defaults). thread_default(stack(Stack)) :- !, recorded('$thread_defaults',[Stack, _, _, _, _], _). thread_default(trail(Trail)) :- !, recorded('$thread_defaults',[_, Trail, _, _, _], _). thread_default(system(System)) :- !, recorded('$thread_defaults',[_, _, System, _, _], _). thread_default(detached(Detached)) :- !, recorded('$thread_defaults',[_, _, _, Detached, _], _). thread_default(at_exit(AtExit)) :- !, recorded('$thread_defaults',[_, _, _, _, AtExit], _). thread_default(Default) :- '$do_error'(type_error(thread_option,Default),thread_default(Default)). '$thread_default'(stack(Stack), [Stack, _, _, _, _]). '$thread_default'(trail(Trail), [_, Trail, _, _, _]). '$thread_default'(system(System), [_, _, System, _, _]). '$thread_default'(detached(Detached), [_, _, _, Detached, _]). '$thread_default'(at_exit(AtExit), [_, _, _, _, AtExit]). thread_set_defaults(V) :- var(V), !, '$do_error'(instantiation_error, thread_set_defaults(V)). thread_set_defaults([Default| Defaults]) :- !, '$thread_set_defaults'([Default| Defaults], thread_set_defaults([Default| Defaults])). thread_set_defaults(T) :- '$do_error'(type_error(list, T), thread_set_defaults(T)). '$thread_set_defaults'([], _). '$thread_set_defaults'([Default| Defaults], G) :- !, '$thread_set_default'(Default, G), '$thread_set_defaults'(Defaults, G). thread_set_default(V) :- var(V), !, '$do_error'(instantiation_error, thread_set_default(V)). thread_set_default(Default) :- '$thread_set_default'(Default, thread_set_default(Default)). '$thread_set_default'(stack(Stack), G) :- \+ integer(Stack), !, '$do_error'(type_error(integer, Stack), G). '$thread_set_default'(stack(Stack), G) :- Stack < 0, !, '$do_error'(domain_error(not_less_than_zero, Stack), G). '$thread_set_default'(stack(Stack), _) :- !, recorded('$thread_defaults', [_, Trail, System, Detached, AtExit], Ref), erase(Ref), recorda('$thread_defaults', [Stack, Trail, System, Detached, AtExit], _). '$thread_set_default'(trail(Trail), G) :- \+ integer(Trail), !, '$do_error'(type_error(integer, Trail), G). '$thread_set_default'(trail(Trail), G) :- Trail < 0, !, '$do_error'(domain_error(not_less_than_zero, Trail), G). '$thread_set_default'(trail(Trail), _) :- !, recorded('$thread_defaults', [Stack, _, System, Detached, AtExit], Ref), erase(Ref), recorda('$thread_defaults', [Stack, Trail, System, Detached, AtExit], _). '$thread_set_default'(system(System), G) :- \+ integer(System), !, '$do_error'(type_error(integer, System), G). '$thread_set_default'(system(System), G0) :- System < 0, !, '$do_error'(domain_error(not_less_than_zero, System), G0). '$thread_set_default'(system(System), _) :- !, recorded('$thread_defaults', [Stack, Trail, _, Detached, AtExit], Ref), erase(Ref), recorda('$thread_defaults', [Stack, Trail, System, Detached, AtExit], _). '$thread_set_default'(detached(Detached), G) :- Detached \== true, Detached \== false, !, '$do_error'(type_error(boolean, Detached), G). '$thread_set_default'(detached(Detached), _) :- !, recorded('$thread_defaults', [Stack, Trail, System, _, AtExit], Ref), erase(Ref), recorda('$thread_defaults', [Stack, Trail, System, Detached, AtExit], _). '$thread_set_default'(at_exit(AtExit), G) :- \+ callable(AtExit), !, '$do_error'(type_error(callable, AtExit), G). '$thread_set_default'(at_exit(AtExit), _) :- !, recorded('$thread_defaults', [Stack, Trail, System, Detached, _], Ref), erase(Ref), '$current_module'(M), recorda('$thread_defaults', [Stack, Trail, System, Detached, M:AtExit], _). '$thread_set_default'(Default, G) :- '$do_error'(domain_error(thread_default, Default), G). /** @pred thread_self(- _Id_) Get the Prolog thread identifier of the running thread. If the thread has an alias, the alias-name is returned. */ thread_self(Id) :- nonvar(Id), \+ integer(Id), \+ atom(Id), !, '$do_error'(domain_error(thread_or_alias, Id), thread_self(Id)). thread_self(Id) :- '$thread_self'(Id0), '$thread_id_alias'(Id0, Id). /* Exit status may be either true, false, exception(Term), or exited(Term) */ /** @pred thread_join(+ _Id_, - _Status_) Wait for the termination of thread with given _Id_. Then unify the result-status of the thread with _Status_. After this call, _Id_ becomes invalid and all resources associated with the thread are reclaimed. Note that threads with the attribute `detached` `true` cannot be joined. See also current_thread/2. A thread that has been completed without thread_join/2 being called on it is partly reclaimed: the Prolog stacks are released and the C-thread is destroyed. A small data-structure representing the exit-status of the thread is retained until thread_join/2 is called on the thread. Defined values for _Status_ are: + true The goal has been proven successfully. + false The goal has failed. + exception( _Term_) The thread is terminated on an exception. See print_message/2 to turn system exceptions into readable messages. + exited( _Term_) The thread is terminated on thread_exit/1 using the argument _Term_. + thread_detach(+ _Id_) Switch thread into detached-state (see `detached` option at thread_create/3 at runtime. _Id_ is the identifier of the thread placed in detached state. One of the possible applications is to simplify debugging. Threads that are created as `detached` leave no traces if they crash. For not-detached threads the status can be inspected using current_thread/2. Threads nobody is waiting for may be created normally and detach themselves just before completion. This way they leave no traces on normal completion and their reason for failure can be inspected. */ thread_join(Id, Status) :- nonvar(Status), !, '$do_error'(uninstantiation_error(Status),thread_join(Id, Status)). thread_join(Id, Status) :- '$check_thread_or_alias'(Id, thread_join(Id, Status)), '$thread_id_alias'(Id0, Id), '$thread_join'(Id0), '$mk_tstatus_key'(Id0, Key), recorded(Key, Status, R), erase(R), '$thread_destroy'(Id0). thread_cancel(Id) :- (Id == main; Id == 0), !, '$do_error'(permission_error(cancel, thread, main), thread_cancel(Id)). thread_cancel(Id) :- thread_signal(Id, throw(error(thread_cancel(Id),thread_cancel(Id)))). thread_detach(Id) :- '$check_thread_or_alias'(Id, thread_detach(Id)), '$thread_id_alias'(Id0, Id), '$detach_thread'(Id0), '$mk_tstatus_key'(Id0, Key), ( recorded(Key, _, _) -> '$erase_thread_info'(Id0), '$thread_destroy'(Id0) ; '$thread_unlock'(Id0) ). /** @pred thread_exit(+ _Term_) Terminates the thread immediately, leaving `exited( _Term_)` as result-state for thread_join/2. If the thread has the attribute `detached` `true` it terminates, but its exit status cannot be retrieved using thread_join/2 making the value of _Term_ irrelevant. The Prolog stacks and C-thread are reclaimed. */ thread_exit(Term) :- var(Term), !, '$do_error'(instantiation_error, thread_exit(Term)). thread_exit(Term) :- throw('$thread_finished'(exited(Term))). '$run_at_thread_exit'(_Id0) :- '$thread_run_at_exit'(G, M), catch(once(M:G), _, fail), fail. '$run_at_thread_exit'(Id0) :- recorded('$thread_exit_hook',[Id0|Hook],R), erase(R), catch(once(Hook),_,fail), fail. '$run_at_thread_exit'(_). /** @pred thread_at_exit(: _Term_) Run _Goal_ just before releasing the thread resources. This is to be compared to `at_halt/1`, but only for the current thread. These hooks are ran regardless of why the execution of the thread has been completed. As these hooks are run, the return-code is already available through thread_property/2 using the result of thread_self/1 as thread-identifier. If you want to guarantee the execution of an exit hook no matter how the thread terminates (the thread can be aborted before reaching the thread_at_exit/1 call), consider using instead the `at_exit/1` option of thread_create/3. */ thread_at_exit(Goal) :- '$check_callable'(Goal,thread_at_exit(Goal)), '$thread_self'(Id0), recordz('$thread_exit_hook',[Id0|Goal],_). /** @} */ /** @defgroup Monitoring_Threads Monitoring Threads @ingroup Threads Normal multi-threaded applications should not need these the predicates from this section because almost any usage of these predicates is unsafe. For example checking the existence of a thread before signalling it is of no use as it may vanish between the two calls. Catching exceptions using catch/3 is the only safe way to deal with thread-existence errors. These predicates are provided for diagnosis and monitoring tasks. @{ */ /** @pred current_thread(+ _Id_, - _Status_) Enumerates identifiers and status of all currently known threads. Calling current_thread/2 does not influence any thread. See also thread_join/2. For threads that have an alias-name, this name is returned in _Id_ instead of the numerical thread identifier. _Status_ is one of: + running The thread is running. This is the initial status of a thread. Please note that threads waiting for something are considered running too. + false The _Goal_ of the thread has been completed and failed. + true The _Goal_ of the thread has been completed and succeeded. + exited( _Term_) The _Goal_ of the thread has been terminated using thread_exit/1 with _Term_ as argument. If the underlying native thread has exited (using pthread_exit()) _Term_ is unbound. + exception( _Term_) The _Goal_ of the thread has been terminated due to an uncaught exception (see throw/1 and catch/3). */ current_thread(Id, Status) :- catch(thread_property(Id, status(Status)), error(existence_error(_,_),_), fail). '$thread_id_alias'(Id, Alias) :- recorded('$thread_alias', [Id|Alias], _), !. '$thread_id_alias'(Id, Id). thread_property(Prop) :- '$check_thread_property'(Prop, thread_property(Prop)), '$thread_self'(Id), '$thread_property'(Prop, Id). /** @pred thread_property(? _Id_, ? _Property_) Enumerates the properties of the specified thread. Calling thread_property/2 does not influence any thread. See also thread_join/2. For threads that have an alias-name, this name can be used in _Id_ instead of the numerical thread identifier. _Property_ is one of: + status( _Status_) The thread status of a thread (see below). + alias( _Alias_) The thread alias, if it exists. + at_exit( _AtExit_) The thread exit hook, if defined (not available if the thread is already terminated). + detached( _Boolean_) The detached state of the thread. + stack( _Size_) The thread stack data-area size. + trail( _Size_) The thread trail data-area size. + system( _Size_) The thread system data-area size. */ thread_property(Id, Prop) :- ( nonvar(Id) -> '$check_thread_or_alias'(Id, thread_property(Id, Prop)) ; '$enumerate_threads'(Id) ), '$check_thread_property'(Prop, thread_property(Id, Prop)), '$thread_id_alias'(Id0, Id), '$thread_property'(Prop, Id0). '$enumerate_threads'(Id) :- '$max_threads'(Max), Max1 is Max-1, between(0,Max1,Id), '$thread_stacks'(Id, _, _, _). '$thread_property'(alias(Alias), Id) :- recorded('$thread_alias', [Id|Alias], _). '$thread_property'(status(Status), Id) :- '$mk_tstatus_key'(Id, Key), ( recorded(Key, Exit, _) -> Status = Exit ; Status = running ). '$thread_property'(detached(Detached), Id) :- ( '$thread_detached'(Id,Detached) -> true ; Detached = false ). '$thread_property'(at_exit(M:G), _Id) :- '$thread_run_at_exit'(G,M). '$thread_property'(stack(Stack), Id) :- '$thread_stacks'(Id, Stack, _, _). '$thread_property'(trail(Trail), Id) :- '$thread_stacks'(Id, _, Trail, _). '$thread_property'(system(System), Id) :- '$thread_stacks'(Id, _, _, System). threads :- format(user_error,'------------------------------------------------------------------------~n',[]), format(user_error, '~t~a~48+~n', 'Thread Detached Status'), format(user_error,'------------------------------------------------------------------------~n',[]), thread_property(Id, detached(Detached)), thread_property(Id, status(Status)), '$thread_id_alias'(Id, Alias), format(user_error,'~t~q~30+~33|~w~42|~q~n', [Alias, Detached, Status]), fail. threads :- format(user_error,'------------------------------------------------------------------------~n',[]). '$check_thread_or_alias'(Term, Goal) :- var(Term), !, '$do_error'(instantiation_error, Goal). '$check_thread_or_alias'(Term, Goal) :- \+ integer(Term), \+ atom(Term), !, '$do_error'(domain_error(thread_or_alias, Term), Goal). '$check_thread_or_alias'(Term, Goal) :- atom(Term), \+ recorded('$thread_alias',[_|Term],_), !, '$do_error'(existence_error(thread, Term), Goal). '$check_thread_or_alias'(Term, Goal) :- integer(Term), \+ '$valid_thread'(Term), !, '$do_error'(existence_error(thread, Term), Goal). '$check_thread_or_alias'(_,_). '$check_thread_property'(Term, _) :- var(Term), !. '$check_thread_property'(alias(_), _) :- !. '$check_thread_property'(detached(_), _) :- !. '$check_thread_property'(at_exit(_), _) :- !. '$check_thread_property'(status(_), _) :- !. '$check_thread_property'(stack(_), _) :- !. '$check_thread_property'(trail(_), _) :- !. '$check_thread_property'(system(_), _) :- !. '$check_thread_property'(Term, Goal) :- '$do_error'(domain_error(thread_property, Term), Goal). '$check_mutex_or_alias'(Term, Goal) :- var(Term), !, '$do_error'(instantiation_error, Goal). '$check_mutex_or_alias'(Term, Goal) :- \+ integer(Term), \+ atom(Term), !, '$do_error'(domain_error(mutex_or_alias, Term), Goal). '$check_mutex_or_alias'(Term, Goal) :- atom(Term), \+ recorded('$mutex_alias',[_|Term],_), !, '$do_error'(existence_error(mutex, Term), Goal). '$check_mutex_or_alias'(Term, Goal) :- % integer(Term), \+ '$valid_mutex'(Term), !, integer(Term), \+ recorded('$mutex_alias',[Term|_],_), !, '$do_error'(existence_error(mutex, Term), Goal). '$check_mutex_or_alias'(_,_). '$check_mutex_property'(Term, _) :- var(Term), !. '$check_mutex_property'(alias(_), _) :- !. '$check_mutex_property'(status(Status), Goal) :- !, ( var(Status) -> true ; Status = unlocked -> true ; Status = locked(_, _) -> true ; '$do_error'(domain_error(mutex_property, status(Status)), Goal) ). '$check_mutex_property'(Term, Goal) :- '$do_error'(domain_error(mutex_property, Term), Goal). '$mk_tstatus_key'(Id0, Key) :- atomic_concat('$thread_exit_status__',Id0,Key). /** @pred thread_statistics(+ _Id_, + _Key_, - _Value_) Obtains statistical information on thread _Id_ as `statistics/2` does in single-threaded applications. This call returns all keys of `statistics/2`, although only information statistics about the stacks and CPU time yield different values for each thread. + mutex_statistics Print usage statistics on internal mutexes and mutexes associated with dynamic predicates. For each mutex two numbers are printed: the number of times the mutex was acquired and the number of collisions: the number times the calling thread has to wait for the mutex. The collision-count is not available on Windows as this would break portability to Windows-95/98/ME or significantly harm performance. Generally collision count is close to zero on single-CPU hardware. + threads Prints a table of current threads and their status. */ thread_statistics(Id, Key, Val) :- format("not implemented yet: ~w, ~w, ~w~n",[Id, Key, Val]). %% @} /** @defgroup Signalling_Threads Signalling Threads @ingroup Threadas These predicates provide a mechanism to make another thread execute some goal as an <em>interrupt</em>. Signalling threads is safe as these interrupts are only checked at safe points in the virtual machine. Nevertheless, signalling in multi-threaded environments should be handled with care as the receiving thread may hold a <em>mutex</em> (see with_mutex/2). Signalling probably only makes sense to start debugging threads and to cancel no-longer-needed threads with throw/1, where the receiving thread should be designed carefully do handle exceptions at any point. @} */ /** @defgroup Thread_Synchronisation Thread Synchronisation @ingroup Threads @{ All internal Prolog operations are thread-safe. This implies two Prolog threads can operate on the same dynamic predicate without corrupting the consistency of the predicate. This section deals with user-level <em>mutexes</em> (called <em>monitors</em> in ADA or <em>critical-sections</em> by Microsoft). A mutex is a <em>MUT</em>ual <em>EX</em>clusive device, which implies at most one thread can <em>hold</em> a mutex. Mutexes are used to realise related updates to the Prolog database. With `related', we refer to the situation where a `transaction' implies two or more changes to the Prolog database. For example, we have a predicate `address/2`, representing the address of a person and we want to change the address by retracting the old and asserting the new address. Between these two operations the database is invalid: this person has either no address or two addresses, depending on the assert/retract order. Here is how to realise a correct update: ~~~~~ :- initialization mutex_create(addressbook). change_address(Id, Address) :- mutex_lock(addressbook), retractall(address(Id, _)), asserta(address(Id, Address)), mutex_unlock(addressbook). ~~~~~ */ /** @pred mutex_create(? _MutexId_) Create a mutex. if _MutexId_ is an atom, a <em>named</em> mutex is created. If it is a variable, an anonymous mutex reference is returned. There is no limit to the number of mutexes that can be created. */ mutex_create(Id, Options) :- nonvar(Id), !, '$do_error'(uninstantiation_error(Id), mutex_create(Id, Options)). mutex_create(Id, Options) :- Goal = mutex_create(Id, Options), '$mutex_options'(Options, Alias, Goal), ( atom(Alias) -> mutex_create(Alias) ; mutex_create(Id) ). '$mutex_options'(Var, _, Goal) :- var(Var), !, '$do_error'(instantiation_error, Goal). '$mutex_options'([], _, _) :- !. '$mutex_options'([Option| Options], Alias, Goal) :- !, '$mutex_option'(Option, Alias, Goal), '$mutex_options'(Options, Alias, Goal). '$mutex_options'(Options, _, Goal) :- '$do_error'(type_error(list, Options), Goal). '$mutex_option'(Var, _, Goal) :- var(Var), !, '$do_error'(instantiation_error, Goal). '$mutex_option'(alias(Alias), Alias, Goal) :- !, ( atom(Alias) -> true ; '$do_error'(type_error(atom, Alias), Goal) ). '$mutex_option'(Option, _, Goal) :- '$do_error'(domain_error(mutex_option, Option), Goal). /** @pred mutex_unlock_all Unlock all mutexes held by the current thread. This call is especially useful to handle thread-termination using abort/0 or exceptions. See also thread_signal/2. */ mutex_unlock_all :- '$thread_self'(Tid), '$unlock_all_thread_mutexes'(Tid). '$unlock_all_thread_mutexes'(Tid) :- recorded('$mutex_alias',[Id|_],_), '$mutex_info'(Id, NRefs, Tid), NRefs > 0, '$mutex_unlock_all'(Id), fail. '$unlock_all_thread_mutexes'(_). '$mutex_unlock_all'(Id) :- '$mutex_info'(Id, NRefs, _), NRefs > 0, '$unlock_mutex'(Id), '$mutex_unlock_all'(Id). /** @pred current_mutex(? _MutexId_, ? _ThreadId_, ? _Count_) Enumerates all existing mutexes. If the mutex is held by some thread, _ThreadId_ is unified with the identifier of the holding thread and _Count_ with the recursive count of the mutex. Otherwise, _ThreadId_ is `[]` and _Count_ is 0. */ current_mutex(M, T, NRefs) :- recorded('$mutex_alias',[Id|M],_), '$mutex_info'(Id, NRefs, T). mutex_property(Mutex, Prop) :- ( nonvar(Mutex) -> '$check_mutex_or_alias'(Mutex, mutex_property(Mutex, Prop)) ; recorded('$mutex_alias', [_|Mutex], _) ), '$check_mutex_property'(Prop, mutex_property(Mutex, Prop)), '$mutex_id_alias'(Id, Mutex), '$mutex_property'(Id, Prop). '$mutex_property'(Id, alias(Alias)) :- recorded('$mutex_alias', [Id|Alias], _), Id \= Alias. '$mutex_property'(Id, status(Status)) :- '$mutex_info'(Id, Count, HoldingThread), ( Count =:= 0 -> Status = unlocked ; % Count > 0, '$thread_id_alias'(HoldingThread, Alias), once((Thread = Alias; Thread = HoldingThread)), Status = locked(Thread, Count) ). %% @} /** @defgroup Thread_Communication Thread communication @ingroup Threads Prolog threads can exchange data using dynamic predicates, database records, and other globally shared data. These provide no suitable means to wait for data or a condition as they can only be checked in an expensive polling loop. <em>Message queues</em> provide a means for threads to wait for data or conditions without using the CPU. Each thread has a message-queue attached to it that is identified by the thread. Additional queues are created using `message_queue_create/2`. @{ */ message_queue_create(Id, Options) :- nonvar(Id), !, '$do_error'(uninstantiation_error(Id), message_queue_create(Id, Options)). message_queue_create(Id, Options) :- var(Options), !, '$do_error'(instantiation_error, message_queue_create(Id, Options)). message_queue_create(Id, []) :- !, '$message_queue_create'(Id). message_queue_create(Id, [alias(Alias)]) :- var(Alias), !, '$do_error'(instantiation_error, message_queue_create(Id, [alias(Alias)])). message_queue_create(Id, [alias(Alias)]) :- \+ atom(Alias), !, '$do_error'(type_error(atom,Alias), message_queue_create(Id, [alias(Alias)])). message_queue_create(Id, [alias(Alias)]) :- var(Id), !, ( recorded('$thread_alias', [_|Alias], _) -> '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Alias, [alias(Alias)])) ; '$message_queue_create'(Id), recordz('$thread_alias', [Id|Alias], _) ). message_queue_create(Alias, [alias(Alias)]) :- !, ( recorded('$thread_alias', [_|Alias], _) -> '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Alias, [alias(Alias)])) ; '$message_queue_create'(Alias) ). message_queue_create(Id, [Option| _]) :- '$do_error'(domain_error(queue_option, Option), message_queue_create(Id, [Option| _])). message_queue_create(Id, Options) :- '$do_error'(type_error(list, Options), message_queue_create(Id, Options)). /** @pred message_queue_create(? _Queue_) If _Queue_ is an atom, create a named queue. To avoid ambiguity on `thread_send_message/2`, the name of a queue may not be in use as a thread-name. If _Queue_ is unbound an anonymous queue is created and _Queue_ is unified to its identifier. */ message_queue_create(Id) :- ( var(Id) -> % ISO DTR '$message_queue_create'(Id) ; atom(Id) -> % old behavior '$message_queue_create'(Id) ; '$do_error'(uninstantiation_error(Id), message_queue_create(Id)) ). /** @pred message_queue_destroy(+ _Queue_) Destroy a message queue created with message_queue_create/1. It is <em>not</em> allows to destroy the queue of a thread. Neither is it allowed to destroy a queue other threads are waiting for or, for anonymous message queues, may try to wait for later. */ message_queue_destroy(Name) :- var(Name), !, '$do_error'(instantiation_error,message_queue_destroy(Name)). message_queue_destroy(Alias) :- recorded('$thread_alias', [Id|Alias], Ref), atom(Id), !, '$message_queue_destroy'(Id), erase(Ref). message_queue_destroy(Name) :- atom(Name), '$message_queue_destroy'(Name), recorded('$thread_alias', [Name|_Alias], Ref), erase(Ref), fail. message_queue_destroy(_). /* @pred message_queue_property(+ _Queue_) Report on the alias and number of messages stored in a queue created with message_queue_create/1. + `alias(Alias)` report the alias for stream _S_. It can also be used to enumerate all message queues that have aliases, including anonymous queues. + `size(Size)` unifies _Size_ with the number of messages in the queue. */ message_queue_property( Id, alias(Alias) ) :- recorded('$thread_alias',[Id|Alias],_). message_queue_property( Alias, size(Size) ) :- ground(Alias), recorded('$thread_alias',[Id|Alias],_), '$message_queue_size'(Id, Size). message_queue_property( Id, size(Size) ) :- '$message_queue_size'(Id, Size). /** @pred thread_send_message(+ _Term_) Places _Term_ in the message-queue of the thread running the goal. Any term can be placed in a message queue, but note that the term is copied to the receiving thread and variable-bindings are thus lost. This call returns immediately. */ thread_send_message(Term) :- '$thread_self'(Id), thread_send_message(Id, Term). /** @pred thread_send_message(+ _QueueOrThreadId_, + _Term_) Place _Term_ in the given queue or default queue of the indicated thread (which can even be the message queue of itself (see thread_self/1). Any term can be placed in a message queue, but note that the term is copied to the receiving thread and variable-bindings are thus lost. This call returns immediately. If more than one thread is waiting for messages on the given queue and at least one of these is waiting with a partially instantiated _Term_, the waiting threads are <em>all</em> sent a wakeup signal, starting a rush for the available messages in the queue. This behaviour can seriously harm performance with many threads waiting on the same queue as all-but-the-winner perform a useless scan of the queue. If there is only one waiting thread or all waiting threads wait with an unbound variable an arbitrary thread is restarted to scan the queue. */ thread_send_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_send_message(Queue,Term)). thread_send_message(Queue, Term) :- recorded('$thread_alias',[Id|Queue],_R), !, '$message_queue_send'(Id, Term). thread_send_message(Queue, Term) :- '$message_queue_send'(Queue, Term). /** @pred thread_get_message(? _Term_) Examines the thread message-queue and if necessary blocks execution until a term that unifies to _Term_ arrives in the queue. After a term from the queue has been unified unified to _Term_, the term is deleted from the queue and this predicate returns. Please note that not-unifying messages remain in the queue. After the following has been executed, thread 1 has the term `gnu` in its queue and continues execution using _A_ is `gnat`. ~~~~~ <thread 1> thread_get_message(a(A)), <thread 2> thread_send_message(b(gnu)), thread_send_message(a(gnat)), ~~~~~ See also thread_peek_message/1. */ thread_get_message(Term) :- '$thread_self'(Id), thread_get_message(Id, Term). /** @pred thread_get_message(+ _Queue_, ? _Term_) As thread_get_message/1, operating on a given queue. It is allowed to peek into another thread's message queue, an operation that can be used to check whether a thread has swallowed a message sent to it. */ thread_get_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_get_message(Queue,Term)). thread_get_message(Queue, Term) :- recorded('$thread_alias',[Id|Queue],_R), !, '$message_queue_receive'(Id, Term). thread_get_message(Queue, Term) :- '$message_queue_receive'(Queue, Term). /** @pred thread_peek_message(? _Term_) Examines the thread message-queue and compares the queued terms with _Term_ until one unifies or the end of the queue has been reached. In the first case the call succeeds (possibly instantiating _Term_. If no term from the queue unifies this call fails. */ thread_peek_message(Term) :- '$thread_self'(Id), thread_peek_message(Id, Term). /** @pred thread_peek_message(+ _Queue_, ? _Term_) As thread_peek_message/1, operating on a given queue. It is allowed to peek into another thread's message queue, an operation that can be used to check whether a thread has swallowed a message sent to it. Explicit message queues are designed with the <em>worker-pool</em> model in mind, where multiple threads wait on a single queue and pick up the first goal to execute. Below is a simple implementation where the workers execute arbitrary Prolog goals. Note that this example provides no means to tell when all work is done. This must be realised using additional synchronisation. ~~~~~ % create_workers(+Id, +N) % % Create a pool with given Id and number of workers. create_workers(Id, N) :- message_queue_create(Id), forall(between(1, N, _), thread_create(do_work(Id), _, [])). do_work(Id) :- repeat, thread_get_message(Id, Goal), ( catch(Goal, E, print_message(error, E)) -> true ; print_message(error, goal_failed(Goal, worker(Id))) ), fail. % work(+Id, +Goal) % % Post work to be done by the pool work(Id, Goal) :- thread_send_message(Id, Goal). ~~~~~ */ thread_peek_message(Queue, Term) :- var(Queue), !, '$do_error'(instantiation_error,thread_peek_message(Queue,Term)). thread_peek_message(Queue, Term) :- recorded('$thread_alias',[Id|Queue],_R), !, '$message_queue_peek'(Id, Term). tthread_peek_message(Queue, Term) :- '$message_queue_peek'(Queue, Term). %% @} /** @defgroup Signalling_Threads Signalling Threads @ingroup Threadas These predicates provide a mechanism to make another thread execute some goal as an <em>interrupt</em>. Signalling threads is safe as these interrupts are only checked at safe points in the virtual machine. Nevertheless, signalling in multi-threaded environments should be handled with care as the receiving thread may hold a <em>mutex</em> (see with_mutex/2). Signalling probably only makes sense to start debugging threads and to cancel no-longer-needed threads with throw/1, where the receiving thread should be designed carefully do handle exceptions at any point. @{ */ /** @pred thread_sleep(+ _Time_) Make current thread sleep for _Time_ seconds. _Time_ may be an integer or a floating point number. When time is zero or a negative value the call succeeds and returns immediately. This call should not be used if alarms are also being used. */ thread_sleep(Time) :- var(Time), !, '$do_error'(instantiation_error,thread_sleep(Time)). thread_sleep(Time) :- integer(Time), !, ( Time > 0 -> '$thread_sleep'(Time,0,_,_) ; true ). thread_sleep(Time) :- float(Time), !, ( Time > 0.0 -> STime is integer(float_integer_part(Time)), NTime is integer(float_fractional_part(Time))*1000000000, '$thread_sleep'(STime,NTime,_,_) ; true ). thread_sleep(Time) :- '$do_error'(type_error(number,Time),thread_sleep(Time)). thread_signal(Id, Goal) :- '$check_thread_or_alias'(Id, thread_signal(Id, Goal)), '$check_callable'(Goal, thread_signal(Id, Goal)), '$thread_id_alias'(Id0, Id), ( recorded('$thread_signal', [Id0| _], R), erase(R), fail ; true ), recorda('$thread_signal', [Id0| Goal], _), '$signal_thread'(Id0). '$thread_gfetch'(G) :- '$thread_self'(Id), recorded('$thread_signal',[Id|G],R), erase(R). %% @} /** @defgroup Threads_and_Dynamic_Predicates Threads and Dynamic Predicates @ingroup Threads @{ Besides queues threads can share and exchange data using dynamic predicates. The multi-threaded version knows about two types of dynamic predicates. By default, a predicate declared <em>dynamic</em> (see dynamic/1) is shared by all threads. Each thread may assert, retract and run the dynamic predicate. Synchronisation inside Prolog guarantees the consistency of the predicate. Updates are <em>logical</em>: visible clauses are not affected by assert/retract after a query started on the predicate. In many cases primitive from thread synchronisation should be used to ensure application invariants on the predicate are maintained. Besides shared predicates, dynamic predicates can be declared with the thread_local/1 directive. Such predicates share their attributes, but the clause-list is different in each thread. */ /** @pred thread_local( _+Functor/Arity_) related to the dynamic/1 directive. It tells the system that the predicate may be modified using assert/1, retract/1, etc, during execution of the program. Unlike normal shared dynamic data however each thread has its own clause-list for the predicate. As a thread starts, this clause list is empty. If there are still clauses as the thread terminates these are automatically reclaimed by the system. The `thread_local` property implies the property `dynamic`. Thread-local dynamic predicates are intended for maintaining thread-specific state or intermediate results of a computation. It is not recommended to put clauses for a thread-local predicate into a file as in the example below as the clause is only visible from the thread that loaded the source-file. All other threads start with an empty clause-list. ~~~~~ :- thread_local foo/1. foo(gnat). ~~~~~ */ thread_local(X) :- '$current_module'(M), '$thread_local'(X,M). '$thread_local'(X,M) :- var(X), !, '$do_error'(instantiation_error,thread_local(M:X)). '$thread_local'(Mod:Spec,_) :- !, '$thread_local'(Spec,Mod). '$thread_local'([], _) :- !. '$thread_local'([H|L], M) :- !, '$thread_local'(H, M), '$thread_local'(L, M). '$thread_local'((A,B),M) :- !, '$thread_local'(A,M), '$thread_local'(B,M). '$thread_local'(X,M) :- !, '$thread_local2'(X,M). '$thread_local2'(A/N, Mod) :- integer(N), atom(A), !, functor(T,A,N), (Mod \= idb -> '$predicate_flags'(T,Mod,F,F) ; true), ( '$install_thread_local'(T,Mod) -> true ; F /\ 0x08002000 =\= 0 -> '$do_error'(permission_error(modify,dynamic_procedure,A/N),thread_local(Mod:A/N)) ; '$do_error'(permission_error(modify,static_procedure,A/N),thread_local(Mod:A/N)) ). '$thread_local2'(X,Mod) :- '$do_error'(type_error(callable,X),thread_local(Mod:X)). %% @} /** @} */