diff --git a/pl/threads.yap b/pl/threads.yap index fa71bf61e..ccafc604a 100644 --- a/pl/threads.yap +++ b/pl/threads.yap @@ -33,7 +33,7 @@ recorda('$thread_defaults', [0, 0, 0, false, true], _), '$new_mutex'(QId), assert('$global_queue_mutex'(QId)), - '$create_mq'(0), + '$create_thread_mq'(0), '$new_mutex'(Id), assert('$with_mutex_mutex'(Id)). @@ -54,18 +54,16 @@ '$close_thread'('$thread_finished'(Status), Detached, Id0) :- !, '$run_at_thread_exit'(Id0), - (Detached == true -> - true - ; - recorda('$thread_exit_status', [Id0|Status], _) + ( Detached == true -> + '$erase_thread_info'(Id0) + ; recorda('$thread_exit_status', [Id0|Status], _) ). % format(user_error,'closing thread ~w~n',[v([Id0|Status])]). -'$close_thread'(Exception,Detached) :- +'$close_thread'(Exception, Detached) :- '$run_at_thread_exit'(Id0), - (Detached == true -> - true - ; - recorda('$thread_exit_status', [Id0|exception(Exception)], _) + ( Detached == true -> + '$erase_thread_info'(Id0) + ; recorda('$thread_exit_status', [Id0|exception(Exception)], _) ). thread_create(Goal) :- @@ -75,7 +73,7 @@ thread_create(Goal) :- '$thread_new_tid'(Id), '$erase_thread_info'(Id), '$record_thread_info'(Id, [Stack, Trail, System], Detached, AtExit), - '$create_mq'(Id), + '$create_thread_mq'(Id), '$create_thread'(Goal, Stack, Trail, System, Detached, Id). thread_create(Goal, OutId) :- @@ -86,7 +84,7 @@ thread_create(Goal, OutId) :- '$thread_new_tid'(Id), '$erase_thread_info'(Id), '$record_thread_info'(Id, [Stack, Trail, System], Detached, AtExit), - '$create_mq'(Id), + '$create_thread_mq'(Id), '$create_thread'(Goal, Stack, Trail, System, Detached, Id), OutId = Id. @@ -101,7 +99,7 @@ thread_create(Goal, OutId, Options) :- '$record_thread_info'(Id, [Stack, Trail, System], Detached, AtExit) ; '$record_thread_info'(Id, Alias, [Stack, Trail, System], Detached, AtExit, G0) ), - '$create_mq'(Id), + '$create_thread_mq'(Id), '$create_thread'(Goal, Stack, Trail, System, Detached, Id), OutId = Id. @@ -540,7 +538,8 @@ mutex_property(Mutex, Prop) :- once((Thread = Alias; Thread = HoldingThread)), Status = locked(Thread, Count) ). -/* + + message_queue_create(Id, Options) :- nonvar(Id), !, '$do_error'(type_error(variable, Id), message_queue_create(Id, Options)). @@ -551,7 +550,7 @@ message_queue_create(Id, []) :- !, '$global_queue_mutex'(QMutex), '$new_mutex'(Mutex), '$cond_create'(Cond), - '$thread_new_qid'(Id), + '$mq_new_id'(Id), recorda('$queue',q(Id,Mutex,Cond,Id), _), '$unlock_mutex'(QMutex). message_queue_create(Id, [alias(Alias)]) :- @@ -565,14 +564,14 @@ message_queue_create(Id, [alias(Alias)]) :- !, '$lock_mutex'(QMutex), '$new_mutex'(Mutex), '$cond_create'(Cond), - '$thread_new_qid'(Id), ( recorded('$queue', q(Alias,_,_,_), _) -> '$unlock_mutex'(QMutex), '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Id, [alias(Alias)])) - ; recorded('$thread_alias', [_,Alias], _) -> + ; recorded('$thread_alias', [_|Alias], _) -> '$unlock_mutex'(QMutex), '$do_error'(permission_error(create,queue,alias(Alias)),message_queue_create(Id, [alias(Alias)])) - ; recorda('$queue',q(Alias,Mutex,Cond,Id), _), + ; '$mq_new_id'(Id), + recorda('$queue',q(Alias,Mutex,Cond,Id), _), '$unlock_mutex'(QMutex) ). message_queue_create(Id, [Option| _]) :- @@ -585,50 +584,21 @@ message_queue_create(Id) :- message_queue_create(Id, []) ; atom(Id) -> % old behavior message_queue_create(_, [alias(Id)]) - ; '$do_error'(type_error(variable, Id), message_queue_create(Id)). + ; '$do_error'(type_error(variable, Id), message_queue_create(Id)) ). -*/ -message_queue_create(_, [alias(Alias)]) :- % TEMPORARY FIX - message_queue_create(Alias). -message_queue_create(Cond) :- - var(Cond), !, - '$create_mq'(Cond). -message_queue_create(Name) :- - atom(Name), - recorded('$thread_alias',[_,Name],_), !, - '$do_error'(permission_error(create,message_queue,Name),message_queue_create(Name)). -message_queue_create(Name) :- - atom(Name), !, - '$create_mq'(Name). -message_queue_create(Name) :- - '$do_error'(type_error(atom,Name),message_queue_create(Name)). - -'$create_mq'(Name) :- +'$create_thread_mq'(Tid) :- + '$global_queue_mutex'(QMutex), '$new_mutex'(Mutex), '$cond_create'(Cond), - '$global_queue_mutex'(QMutex), - '$lock_mutex'(QMutex), - '$mq_iname'(Name, CName), - ( recorded('$queue',q(Name,_,_, _),_) -> - '$unlock_mutex'(QMutex), - '$do_error'(permission_error(create,message_queue,Name),message_queue_create(Name)) - ; - recorda('$queue',q(Name,Mutex,Cond, CName),_), - '$unlock_mutex'(QMutex) - ). + '$mq_new_id'(Id), + recorda('$queue', q(Tid,Mutex,Cond,Id), _), + '$unlock_mutex'(QMutex). -'$mq_iname'(I,X) :- - integer(I), !, - atomic_concat('$MQ_NAME_KEY_',I,X). -'$mq_iname'(A,X) :- - var(A), !, +'$mq_new_id'('$message_queue'(I)) :- '$integers'(I), - atomic_concat(message_queue_,I,A), - atomic_concat('$MQ_NAME_KEY_',A,X), - \+ recorded('$queue',q(A,_,_, X),_), !. -'$mq_iname'(A,X) :- - atom_concat('$MQ_NAME_KEY_',A,X). + \+ recorded('$queue', q(_,_,_,'$message_queue'(I)), _), + !. '$integers'(0). '$integers'(I) :- @@ -662,6 +632,48 @@ message_queue_destroy(Name) :- fail. '$clean_mqueue'(_). + +message_queue_property(Id, Prop) :- + ( nonvar(Id) -> + '$check_message_queue_or_alias'(Id, message_queue_property(Id, Prop)) + ; recorded('$queue', q(Id,_,_,_), _) + ), + '$check_message_queue_property'(Prop, message_queue_property(Id, Prop)), + '$message_queue_id_alias'(Id0, Id), + '$message_queue_property'(Id0, Prop). + +'$check_message_queue_or_alias'(Term, Goal) :- + var(Term), !, + '$do_error'(instantiation_error, Goal). +'$check_message_queue_or_alias'(Term, Goal) :- + \+ atom(Term), + Term \= '$message_queue'(_), !, + '$do_error'(domain_error(queue_or_alias, Term), Goal). +'$check_message_queue_or_alias'('$message_queue'(I), Goal) :- + \+ recorded('$queue', q(_,_,_,'$message_queue'(I)), _), !, + '$do_error'(existence_error(queue, '$message_queue'(I)), Goal). +'$check_message_queue_or_alias'(Term, Goal) :- + atom(Term), + \+ recorded('$queue', q(Term,_,_,_), _), !, + '$do_error'(existence_error(queue, Term), Goal). +'$check_message_queue_or_alias'(_, _). + +'$message_queue_id_alias'(Id, Alias) :- + recorded('$queue', q(Alias,_,_,Id), _), !. +'$message_queue_id_alias'(Id, Id). + +'$check_message_queue_property'(Term, _) :- + var(Term), !. +'$check_message_queue_property'(alias(_), _) :- !. +'$check_message_queue_property'(size(_), _) :- !. +'$check_message_queue_property'(max_size(_), _) :- !. +'$check_message_queue_property'(Term, Goal) :- + '$do_error'(domain_error(queue_property, Term), Goal). + +'$message_queue_property'(Id, alias(Alias)) :- + recorded('$queue', q(Alias,Mutex,Cond,Id), _). + + thread_send_message(Term) :- '$thread_self'(Id), thread_send_message(Id, Term). @@ -683,7 +695,7 @@ thread_send_message(Queue, Term) :- thread_send_message(Queue, Term) :- '$global_queue_mutex'(QMutex), '$unlock_mutex'(QMutex), - '$do_error'(existence_error(message_queue,Queue),thread_send_message(Queue,Term)). + '$do_error'(existence_error(queue,Queue),thread_send_message(Queue,Term)). thread_get_message(Term) :- '$thread_self'(Id),