420 lines
9.7 KiB
Prolog
420 lines
9.7 KiB
Prolog
/* $Id$
|
|
|
|
Part of SWI-Prolog
|
|
|
|
Author: Jan Wielemaker
|
|
E-mail: jan@swi.psy.uva.nl
|
|
WWW: http://www.swi-prolog.org
|
|
Copyright (C): 1985-2002, University of Amsterdam
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; either version 2
|
|
of the License, or (at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public
|
|
License along with this library; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
|
|
As a special exception, if you link this library with other files,
|
|
compiled with a Free Software compiler, to produce an executable, this
|
|
library does not by itself cause the resulting executable to be covered
|
|
by the GNU General Public License. This exception does not however
|
|
invalidate any other reasons why the executable file might be covered by
|
|
the GNU General Public License.
|
|
*/
|
|
|
|
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
Stress-testing sockets, message queues and threads. These elements are
|
|
the most important and complicated parts of multi-threaded servers and
|
|
require intensive testing. This program performs the following steps for
|
|
each test:
|
|
|
|
* Create a server consisting of an accepting thread and 5 workers.
|
|
* Run the tests from test_def.
|
|
* Destroy the server
|
|
|
|
For simple tests, get yourself a free machine and run
|
|
|
|
?- forever.
|
|
|
|
It creates a logfile named <host>-forever.txt
|
|
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
|
|
|
|
:- asserta(user:file_search_path(foreign, '.')).
|
|
:- use_module(socket).
|
|
:- use_module(library(debug)).
|
|
|
|
:- dynamic
|
|
on_thread/2,
|
|
port/1,
|
|
worker/1.
|
|
|
|
:- discontiguous
|
|
test_impl/1.
|
|
|
|
/*******************************
|
|
* FOREVER *
|
|
*******************************/
|
|
|
|
forever :-
|
|
forever(_).
|
|
|
|
forever(Test) :-
|
|
gethostname(Host),
|
|
atomic_list_concat([Host, -, 'forever.txt'], Log),
|
|
protocol(Log),
|
|
between(1, 10000000, N),
|
|
get_time(T),
|
|
convert_time(T, S),
|
|
format('***** TEST RUN ~w ***** [~w]~n', [N, S]),
|
|
test(Test),
|
|
fail.
|
|
forever(_). % takes very long
|
|
|
|
/*******************************
|
|
* TOPLEVEL *
|
|
*******************************/
|
|
|
|
test :-
|
|
test(_).
|
|
|
|
test(T) :-
|
|
create_server(5),
|
|
( test_def(T, Test),
|
|
( Test = concurrent(Times, Threads, CTest)
|
|
-> run(concurrent(Times, Threads, test_impl(CTest)))
|
|
; run(test_impl(Test))
|
|
),
|
|
fail
|
|
; true
|
|
),
|
|
stop_server.
|
|
|
|
test_def(s1, echo(100)).
|
|
test_def(s2, big(100000)).
|
|
test_def(s3, big(10000, 10)).
|
|
test_def(s5, timeout).
|
|
test_def(s6, nohost).
|
|
test_def(s7, noport).
|
|
test_def(c1, concurrent(10, 3, echo(100))).
|
|
test_def(c1, concurrent(10, 3, big(10000))).
|
|
|
|
|
|
/*******************************
|
|
* SERVER *
|
|
*******************************/
|
|
|
|
create_server :-
|
|
create_server(5).
|
|
|
|
create_server(Workers) :-
|
|
thread_create(server(Workers), _, [alias(server)]),
|
|
thread_get_message(ready),
|
|
debug(tcp, 'Server started', []).
|
|
|
|
stop_server :-
|
|
forall(worker(_), client(exit, _)),
|
|
debug(tcp, 'Workers exited, prepare to join server', []),
|
|
connect, % make accept return
|
|
thread_join(server).
|
|
|
|
server(Workers) :-
|
|
create_pool(Workers),
|
|
tcp_socket(Socket),
|
|
tcp_bind(Socket, Port),
|
|
assert(port(Port)),
|
|
tcp_listen(Socket, 5),
|
|
debug(tcp, 'Server ready on port ~w', [Port]),
|
|
thread_send_message(main, ready),
|
|
repeat,
|
|
tcp_accept(Socket, Client, _Peer),
|
|
debug(connect, 'Connection from ~w', [_Peer]),
|
|
( worker(_)
|
|
-> thread_send_message(queue, Client),
|
|
fail
|
|
; !,
|
|
tcp_close_socket(Client)
|
|
),
|
|
debug(tcp, 'All workers have gone, stopping', []),
|
|
stop_pool,
|
|
tcp_close_socket(Socket),
|
|
retract(port(Port)).
|
|
|
|
create_pool(Number) :-
|
|
message_queue_create(queue),
|
|
forall(between(1, Number, _),
|
|
( thread_create(work(queue), Id, []),
|
|
assert(worker(Id))
|
|
)).
|
|
|
|
|
|
stop_pool :-
|
|
forall(worker(_),
|
|
thread_send_message(queue, stop)),
|
|
forall(retract(worker(Id)),
|
|
thread_join(Id)),
|
|
message_queue_destroy(queue),
|
|
debug(tcp, 'Pool stopped', []).
|
|
|
|
work(Queue) :-
|
|
repeat,
|
|
thread_get_message(Queue, Socket),
|
|
handle(Socket, Data),
|
|
Data == exit, !.
|
|
|
|
handle(Client, Data) :-
|
|
tcp_open_socket(Client, In, Out),
|
|
read(In, Data),
|
|
% open_null_stream(Null),
|
|
% copy_stream_data(In, Null),
|
|
% close(Null),
|
|
close(In),
|
|
thread_self(Me),
|
|
debug(data, '~p: Received ~W', [Me, Data, [max_depth(10)]]),
|
|
( debugging(thread)
|
|
-> assert(on_thread(Me, Data))
|
|
; true
|
|
),
|
|
do_work(Data, Answer),
|
|
writeq(Out, Answer),
|
|
write(Out, '.\n'),
|
|
debug(data, 'Sent ~W', [Answer, [max_depth(10)]]),
|
|
flush_output(Out),
|
|
close(Out).
|
|
|
|
thread_join(Id) :-
|
|
thread_join(Id, Status),
|
|
( Status == true
|
|
-> true
|
|
; format('WRONG join-status for ~w: ~w~n', [Id, Status])
|
|
).
|
|
|
|
|
|
do_work(exit, true) :- !,
|
|
thread_self(Me),
|
|
debug(tcp, 'Exit worker ~w', [Me]),
|
|
thread_detach(Me),
|
|
retract(worker(Me)).
|
|
do_work(echo(Data), Data) :- !.
|
|
do_work(sleep(Time), true) :- !,
|
|
sleep(Time).
|
|
|
|
|
|
/*******************************
|
|
* CLIENT *
|
|
*******************************/
|
|
|
|
host(localhost).
|
|
|
|
client(Term, Reply) :-
|
|
client(Term, Reply, 0).
|
|
client(Term, Reply, TimeOut) :-
|
|
host(Host),
|
|
port(Port),
|
|
client(Host, Port, TimeOut, Term, Reply).
|
|
client(Host, Port, Timeout, Term, Reply) :-
|
|
tcp_socket(Socket),
|
|
tcp_connect(Socket, Host:Port),
|
|
tcp_open_socket(Socket, In, Out),
|
|
send_output(Term, Out),
|
|
close(Out),
|
|
( Timeout == 0
|
|
-> true
|
|
; set_stream(In, timeout(Timeout))
|
|
),
|
|
call_cleanup(read(In, Reply), close(In)),
|
|
debug(data, 'Reply = ~W', [Reply, [max_depth(10)]]).
|
|
|
|
send_output(List, Out) :-
|
|
is_list(List), !,
|
|
send_list_output(List, Out).
|
|
send_output(Term, Out) :-
|
|
send_term(Term, Out).
|
|
|
|
send_term(Term, Out) :-
|
|
writeq(Out, Term),
|
|
write(Out, '.\n').
|
|
|
|
send_list_output([], _).
|
|
send_list_output([H|T], Out) :-
|
|
send_term(H, Out),
|
|
send_list_output(T, Out).
|
|
|
|
echo(Term) :-
|
|
client(echo(Term), Reply),
|
|
assertion(Term =@= Reply).
|
|
|
|
connect :-
|
|
host(Host),
|
|
port(Port),
|
|
tcp_socket(Socket),
|
|
tcp_connect(Socket, Host:Port),
|
|
tcp_close_socket(Socket).
|
|
|
|
|
|
/*******************************
|
|
* TESTS *
|
|
*******************************/
|
|
|
|
% echo(Times)
|
|
%
|
|
% Send a lot of short messages to the server
|
|
|
|
test_impl(echo(Times)) :- !,
|
|
forall(between(1, Times, N),
|
|
echo(N)).
|
|
|
|
% big(Size)
|
|
%
|
|
% Send a list of 1...Size to the server
|
|
|
|
test_impl(big(Size)) :-
|
|
findall(X, between(1, Size, X), L),
|
|
echo(L).
|
|
|
|
% big(Size, Times)
|
|
%
|
|
% Send Times lists of 1...Size to the server
|
|
|
|
test_impl(big(Size, Times)) :-
|
|
findall(X, between(1, Size, X), L),
|
|
forall(between(1, Times, _),
|
|
echo(L)).
|
|
|
|
test_impl(timeout) :-
|
|
catch(client(sleep(2), _Reply, 1), E, true),
|
|
assertion(E = error(timeout_error(read, '$stream'(_)), _G285)).
|
|
|
|
|
|
/*******************************
|
|
* RUN *
|
|
*******************************/
|
|
|
|
:- meta_predicate
|
|
run(:).
|
|
|
|
run(Goal) :-
|
|
debug(run, ' ** Run ~p ...', [Goal]),
|
|
call_cleanup(Goal, E, cleanup(Goal, E)).
|
|
|
|
cleanup(Goal, E) :-
|
|
debug(run, ' ** Finished ~p: ~p', [Goal, E]).
|
|
|
|
|
|
/*******************************
|
|
* SPECIAL TESTS *
|
|
*******************************/
|
|
|
|
test_impl(nohost) :-
|
|
tcp_socket(S),
|
|
catch(tcp_connect(S, 'foo.bar':80), E, true),
|
|
tcp_close_socket(S),
|
|
assertion(E =@= error(socket_error('Host not found'), _)).
|
|
|
|
test_impl(noport) :-
|
|
tcp_socket(S),
|
|
catch(call_with_time_limit(5, tcp_connect(S, localhost:4321)),
|
|
E, true),
|
|
tcp_close_socket(S),
|
|
assertion(E = error(socket_error(_), _)).
|
|
|
|
|
|
/*******************************
|
|
* DISTRIBUTION *
|
|
*******************************/
|
|
|
|
on_threads :-
|
|
( bagof(D, on_thread(Id, D), Ds),
|
|
length(Ds, L),
|
|
format('Thread ~w handled ~w~n', [Id, L]),
|
|
fail
|
|
; true
|
|
).
|
|
|
|
clean :-
|
|
retractall(on_thread(_, _)).
|
|
|
|
|
|
/*******************************
|
|
* CONCUR *
|
|
*******************************/
|
|
|
|
:- meta_predicate
|
|
concurrent(+, +, :),
|
|
ok(:).
|
|
|
|
% concurrent(+Times, +Threads, :Goal)
|
|
%
|
|
% Run Goal Times times concurrent in Threads
|
|
|
|
concurrent(Times, 1, Goal) :- !,
|
|
forall(between(1, Times, _),
|
|
ok(Goal)).
|
|
concurrent(Times, Threads, Goal) :-
|
|
strip_module(Goal, M, G),
|
|
message_queue_create(Done),
|
|
message_queue_create(Queue),
|
|
create_workers(Threads, M, Queue, Done),
|
|
forall(between(1, Times, _),
|
|
thread_send_message(Queue, goal(G))),
|
|
debug(concurrent, 'Waiting for ~w replies', [Times]),
|
|
wait_n(Times, Done),
|
|
forall(between(1, Threads, _),
|
|
thread_send_message(Queue, done)),
|
|
debug(concurrent, 'Waiting for ~w threads', [Threads]),
|
|
wait_n(Threads, Done),
|
|
message_queue_destroy(Queue),
|
|
message_queue_destroy(Done),
|
|
debug(concurrent, 'All done', []).
|
|
|
|
wait_n(0, _) :- !.
|
|
wait_n(N, Queue) :-
|
|
thread_get_message(Queue, done),
|
|
N2 is N - 1,
|
|
wait_n(N2, Queue).
|
|
|
|
create_workers(N, Module, Queue, Done) :-
|
|
N > 0, !,
|
|
thread_create(worker(Module, Queue, Done),
|
|
_,
|
|
[ detached(true)
|
|
]),
|
|
N2 is N - 1,
|
|
create_workers(N2, Module, Queue, Done).
|
|
create_workers(_, _, _, _).
|
|
|
|
|
|
worker(Module, Queue, Done) :-
|
|
thread_get_message(Queue, Message),
|
|
( Message = goal(Goal)
|
|
-> ok(Module:Goal),
|
|
thread_send_message(Done, done),
|
|
worker(Module, Queue, Done)
|
|
; thread_send_message(Done, done)
|
|
).
|
|
|
|
ok(Goal) :-
|
|
( catch(Goal, E, true)
|
|
-> ( var(E)
|
|
-> true
|
|
; print_message(error, E)
|
|
)
|
|
; print_message(warning, failed(Goal))
|
|
).
|
|
|
|
/*******************************
|
|
* DEBUG *
|
|
*******************************/
|
|
|
|
%:- interactor.
|
|
:- debug(tcp).
|
|
%:- debug(data).
|
|
:- debug(run).
|
|
:- debug(concurrent).
|