clib package
This commit is contained in:
419
packages/clib/stresssocket.pl
Normal file
419
packages/clib/stresssocket.pl
Normal file
@@ -0,0 +1,419 @@
|
||||
/* $Id$
|
||||
|
||||
Part of SWI-Prolog
|
||||
|
||||
Author: Jan Wielemaker
|
||||
E-mail: jan@swi.psy.uva.nl
|
||||
WWW: http://www.swi-prolog.org
|
||||
Copyright (C): 1985-2002, University of Amsterdam
|
||||
|
||||
This program is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU General Public License
|
||||
as published by the Free Software Foundation; either version 2
|
||||
of the License, or (at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
|
||||
As a special exception, if you link this library with other files,
|
||||
compiled with a Free Software compiler, to produce an executable, this
|
||||
library does not by itself cause the resulting executable to be covered
|
||||
by the GNU General Public License. This exception does not however
|
||||
invalidate any other reasons why the executable file might be covered by
|
||||
the GNU General Public License.
|
||||
*/
|
||||
|
||||
/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
Stress-testing sockets, message queues and threads. These elements are
|
||||
the most important and complicated parts of multi-threaded servers and
|
||||
require intensive testing. This program performs the following steps for
|
||||
each test:
|
||||
|
||||
* Create a server consisting of an accepting thread and 5 workers.
|
||||
* Run the tests from test_def.
|
||||
* Destroy the server
|
||||
|
||||
For simple tests, get yourself a free machine and run
|
||||
|
||||
?- forever.
|
||||
|
||||
It creates a logfile named <host>-forever.txt
|
||||
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
|
||||
|
||||
:- asserta(user:file_search_path(foreign, '.')).
|
||||
:- use_module(socket).
|
||||
:- use_module(library(debug)).
|
||||
|
||||
:- dynamic
|
||||
on_thread/2,
|
||||
port/1,
|
||||
worker/1.
|
||||
|
||||
:- discontiguous
|
||||
test_impl/1.
|
||||
|
||||
/*******************************
|
||||
* FOREVER *
|
||||
*******************************/
|
||||
|
||||
forever :-
|
||||
forever(_).
|
||||
|
||||
forever(Test) :-
|
||||
gethostname(Host),
|
||||
atomic_list_concat([Host, -, 'forever.txt'], Log),
|
||||
protocol(Log),
|
||||
between(1, 10000000, N),
|
||||
get_time(T),
|
||||
convert_time(T, S),
|
||||
format('***** TEST RUN ~w ***** [~w]~n', [N, S]),
|
||||
test(Test),
|
||||
fail.
|
||||
forever(_). % takes very long
|
||||
|
||||
/*******************************
|
||||
* TOPLEVEL *
|
||||
*******************************/
|
||||
|
||||
test :-
|
||||
test(_).
|
||||
|
||||
test(T) :-
|
||||
create_server(5),
|
||||
( test_def(T, Test),
|
||||
( Test = concurrent(Times, Threads, CTest)
|
||||
-> run(concurrent(Times, Threads, test_impl(CTest)))
|
||||
; run(test_impl(Test))
|
||||
),
|
||||
fail
|
||||
; true
|
||||
),
|
||||
stop_server.
|
||||
|
||||
test_def(s1, echo(100)).
|
||||
test_def(s2, big(100000)).
|
||||
test_def(s3, big(10000, 10)).
|
||||
test_def(s5, timeout).
|
||||
test_def(s6, nohost).
|
||||
test_def(s7, noport).
|
||||
test_def(c1, concurrent(10, 3, echo(100))).
|
||||
test_def(c1, concurrent(10, 3, big(10000))).
|
||||
|
||||
|
||||
/*******************************
|
||||
* SERVER *
|
||||
*******************************/
|
||||
|
||||
create_server :-
|
||||
create_server(5).
|
||||
|
||||
create_server(Workers) :-
|
||||
thread_create(server(Workers), _, [alias(server)]),
|
||||
thread_get_message(ready),
|
||||
debug(tcp, 'Server started', []).
|
||||
|
||||
stop_server :-
|
||||
forall(worker(_), client(exit, _)),
|
||||
debug(tcp, 'Workers exited, prepare to join server', []),
|
||||
connect, % make accept return
|
||||
thread_join(server).
|
||||
|
||||
server(Workers) :-
|
||||
create_pool(Workers),
|
||||
tcp_socket(Socket),
|
||||
tcp_bind(Socket, Port),
|
||||
assert(port(Port)),
|
||||
tcp_listen(Socket, 5),
|
||||
debug(tcp, 'Server ready on port ~w', [Port]),
|
||||
thread_send_message(main, ready),
|
||||
repeat,
|
||||
tcp_accept(Socket, Client, _Peer),
|
||||
debug(connect, 'Connection from ~w', [_Peer]),
|
||||
( worker(_)
|
||||
-> thread_send_message(queue, Client),
|
||||
fail
|
||||
; !,
|
||||
tcp_close_socket(Client)
|
||||
),
|
||||
debug(tcp, 'All workers have gone, stopping', []),
|
||||
stop_pool,
|
||||
tcp_close_socket(Socket),
|
||||
retract(port(Port)).
|
||||
|
||||
create_pool(Number) :-
|
||||
message_queue_create(queue),
|
||||
forall(between(1, Number, _),
|
||||
( thread_create(work(queue), Id, []),
|
||||
assert(worker(Id))
|
||||
)).
|
||||
|
||||
|
||||
stop_pool :-
|
||||
forall(worker(_),
|
||||
thread_send_message(queue, stop)),
|
||||
forall(retract(worker(Id)),
|
||||
thread_join(Id)),
|
||||
message_queue_destroy(queue),
|
||||
debug(tcp, 'Pool stopped', []).
|
||||
|
||||
work(Queue) :-
|
||||
repeat,
|
||||
thread_get_message(Queue, Socket),
|
||||
handle(Socket, Data),
|
||||
Data == exit, !.
|
||||
|
||||
handle(Client, Data) :-
|
||||
tcp_open_socket(Client, In, Out),
|
||||
read(In, Data),
|
||||
% open_null_stream(Null),
|
||||
% copy_stream_data(In, Null),
|
||||
% close(Null),
|
||||
close(In),
|
||||
thread_self(Me),
|
||||
debug(data, '~p: Received ~W', [Me, Data, [max_depth(10)]]),
|
||||
( debugging(thread)
|
||||
-> assert(on_thread(Me, Data))
|
||||
; true
|
||||
),
|
||||
do_work(Data, Answer),
|
||||
writeq(Out, Answer),
|
||||
write(Out, '.\n'),
|
||||
debug(data, 'Sent ~W', [Answer, [max_depth(10)]]),
|
||||
flush_output(Out),
|
||||
close(Out).
|
||||
|
||||
thread_join(Id) :-
|
||||
thread_join(Id, Status),
|
||||
( Status == true
|
||||
-> true
|
||||
; format('WRONG join-status for ~w: ~w~n', [Id, Status])
|
||||
).
|
||||
|
||||
|
||||
do_work(exit, true) :- !,
|
||||
thread_self(Me),
|
||||
debug(tcp, 'Exit worker ~w', [Me]),
|
||||
thread_detach(Me),
|
||||
retract(worker(Me)).
|
||||
do_work(echo(Data), Data) :- !.
|
||||
do_work(sleep(Time), true) :- !,
|
||||
sleep(Time).
|
||||
|
||||
|
||||
/*******************************
|
||||
* CLIENT *
|
||||
*******************************/
|
||||
|
||||
host(localhost).
|
||||
|
||||
client(Term, Reply) :-
|
||||
client(Term, Reply, 0).
|
||||
client(Term, Reply, TimeOut) :-
|
||||
host(Host),
|
||||
port(Port),
|
||||
client(Host, Port, TimeOut, Term, Reply).
|
||||
client(Host, Port, Timeout, Term, Reply) :-
|
||||
tcp_socket(Socket),
|
||||
tcp_connect(Socket, Host:Port),
|
||||
tcp_open_socket(Socket, In, Out),
|
||||
send_output(Term, Out),
|
||||
close(Out),
|
||||
( Timeout == 0
|
||||
-> true
|
||||
; set_stream(In, timeout(Timeout))
|
||||
),
|
||||
call_cleanup(read(In, Reply), close(In)),
|
||||
debug(data, 'Reply = ~W', [Reply, [max_depth(10)]]).
|
||||
|
||||
send_output(List, Out) :-
|
||||
is_list(List), !,
|
||||
send_list_output(List, Out).
|
||||
send_output(Term, Out) :-
|
||||
send_term(Term, Out).
|
||||
|
||||
send_term(Term, Out) :-
|
||||
writeq(Out, Term),
|
||||
write(Out, '.\n').
|
||||
|
||||
send_list_output([], _).
|
||||
send_list_output([H|T], Out) :-
|
||||
send_term(H, Out),
|
||||
send_list_output(T, Out).
|
||||
|
||||
echo(Term) :-
|
||||
client(echo(Term), Reply),
|
||||
assertion(Term =@= Reply).
|
||||
|
||||
connect :-
|
||||
host(Host),
|
||||
port(Port),
|
||||
tcp_socket(Socket),
|
||||
tcp_connect(Socket, Host:Port),
|
||||
tcp_close_socket(Socket).
|
||||
|
||||
|
||||
/*******************************
|
||||
* TESTS *
|
||||
*******************************/
|
||||
|
||||
% echo(Times)
|
||||
%
|
||||
% Send a lot of short messages to the server
|
||||
|
||||
test_impl(echo(Times)) :- !,
|
||||
forall(between(1, Times, N),
|
||||
echo(N)).
|
||||
|
||||
% big(Size)
|
||||
%
|
||||
% Send a list of 1...Size to the server
|
||||
|
||||
test_impl(big(Size)) :-
|
||||
findall(X, between(1, Size, X), L),
|
||||
echo(L).
|
||||
|
||||
% big(Size, Times)
|
||||
%
|
||||
% Send Times lists of 1...Size to the server
|
||||
|
||||
test_impl(big(Size, Times)) :-
|
||||
findall(X, between(1, Size, X), L),
|
||||
forall(between(1, Times, _),
|
||||
echo(L)).
|
||||
|
||||
test_impl(timeout) :-
|
||||
catch(client(sleep(2), _Reply, 1), E, true),
|
||||
assertion(E = error(timeout_error(read, '$stream'(_)), _G285)).
|
||||
|
||||
|
||||
/*******************************
|
||||
* RUN *
|
||||
*******************************/
|
||||
|
||||
:- meta_predicate
|
||||
run(:).
|
||||
|
||||
run(Goal) :-
|
||||
debug(run, ' ** Run ~p ...', [Goal]),
|
||||
call_cleanup(Goal, E, cleanup(Goal, E)).
|
||||
|
||||
cleanup(Goal, E) :-
|
||||
debug(run, ' ** Finished ~p: ~p', [Goal, E]).
|
||||
|
||||
|
||||
/*******************************
|
||||
* SPECIAL TESTS *
|
||||
*******************************/
|
||||
|
||||
test_impl(nohost) :-
|
||||
tcp_socket(S),
|
||||
catch(tcp_connect(S, 'foo.bar':80), E, true),
|
||||
tcp_close_socket(S),
|
||||
assertion(E =@= error(socket_error('Host not found'), _)).
|
||||
|
||||
test_impl(noport) :-
|
||||
tcp_socket(S),
|
||||
catch(call_with_time_limit(5, tcp_connect(S, localhost:4321)),
|
||||
E, true),
|
||||
tcp_close_socket(S),
|
||||
assertion(E = error(socket_error(_), _)).
|
||||
|
||||
|
||||
/*******************************
|
||||
* DISTRIBUTION *
|
||||
*******************************/
|
||||
|
||||
on_threads :-
|
||||
( bagof(D, on_thread(Id, D), Ds),
|
||||
length(Ds, L),
|
||||
format('Thread ~w handled ~w~n', [Id, L]),
|
||||
fail
|
||||
; true
|
||||
).
|
||||
|
||||
clean :-
|
||||
retractall(on_thread(_, _)).
|
||||
|
||||
|
||||
/*******************************
|
||||
* CONCUR *
|
||||
*******************************/
|
||||
|
||||
:- meta_predicate
|
||||
concurrent(+, +, :),
|
||||
ok(:).
|
||||
|
||||
% concurrent(+Times, +Threads, :Goal)
|
||||
%
|
||||
% Run Goal Times times concurrent in Threads
|
||||
|
||||
concurrent(Times, 1, Goal) :- !,
|
||||
forall(between(1, Times, _),
|
||||
ok(Goal)).
|
||||
concurrent(Times, Threads, Goal) :-
|
||||
strip_module(Goal, M, G),
|
||||
message_queue_create(Done),
|
||||
message_queue_create(Queue),
|
||||
create_workers(Threads, M, Queue, Done),
|
||||
forall(between(1, Times, _),
|
||||
thread_send_message(Queue, goal(G))),
|
||||
debug(concurrent, 'Waiting for ~w replies', [Times]),
|
||||
wait_n(Times, Done),
|
||||
forall(between(1, Threads, _),
|
||||
thread_send_message(Queue, done)),
|
||||
debug(concurrent, 'Waiting for ~w threads', [Threads]),
|
||||
wait_n(Threads, Done),
|
||||
message_queue_destroy(Queue),
|
||||
message_queue_destroy(Done),
|
||||
debug(concurrent, 'All done', []).
|
||||
|
||||
wait_n(0, _) :- !.
|
||||
wait_n(N, Queue) :-
|
||||
thread_get_message(Queue, done),
|
||||
N2 is N - 1,
|
||||
wait_n(N2, Queue).
|
||||
|
||||
create_workers(N, Module, Queue, Done) :-
|
||||
N > 0, !,
|
||||
thread_create(worker(Module, Queue, Done),
|
||||
_,
|
||||
[ detached(true)
|
||||
]),
|
||||
N2 is N - 1,
|
||||
create_workers(N2, Module, Queue, Done).
|
||||
create_workers(_, _, _, _).
|
||||
|
||||
|
||||
worker(Module, Queue, Done) :-
|
||||
thread_get_message(Queue, Message),
|
||||
( Message = goal(Goal)
|
||||
-> ok(Module:Goal),
|
||||
thread_send_message(Done, done),
|
||||
worker(Module, Queue, Done)
|
||||
; thread_send_message(Done, done)
|
||||
).
|
||||
|
||||
ok(Goal) :-
|
||||
( catch(Goal, E, true)
|
||||
-> ( var(E)
|
||||
-> true
|
||||
; print_message(error, E)
|
||||
)
|
||||
; print_message(warning, failed(Goal))
|
||||
).
|
||||
|
||||
/*******************************
|
||||
* DEBUG *
|
||||
*******************************/
|
||||
|
||||
%:- interactor.
|
||||
:- debug(tcp).
|
||||
%:- debug(data).
|
||||
:- debug(run).
|
||||
:- debug(concurrent).
|
||||
Reference in New Issue
Block a user