420 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Prolog
		
	
	
	
	
	
			
		
		
	
	
			420 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Prolog
		
	
	
	
	
	
| /*  $Id$
 | |
| 
 | |
|     Part of SWI-Prolog
 | |
| 
 | |
|     Author:        Jan Wielemaker
 | |
|     E-mail:        jan@swi.psy.uva.nl
 | |
|     WWW:           http://www.swi-prolog.org
 | |
|     Copyright (C): 1985-2002, University of Amsterdam
 | |
| 
 | |
|     This program is free software; you can redistribute it and/or
 | |
|     modify it under the terms of the GNU General Public License
 | |
|     as published by the Free Software Foundation; either version 2
 | |
|     of the License, or (at your option) any later version.
 | |
| 
 | |
|     This program is distributed in the hope that it will be useful,
 | |
|     but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|     GNU General Public License for more details.
 | |
| 
 | |
|     You should have received a copy of the GNU Lesser General Public
 | |
|     License along with this library; if not, write to the Free Software
 | |
|     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 | |
| 
 | |
|     As a special exception, if you link this library with other files,
 | |
|     compiled with a Free Software compiler, to produce an executable, this
 | |
|     library does not by itself cause the resulting executable to be covered
 | |
|     by the GNU General Public License. This exception does not however
 | |
|     invalidate any other reasons why the executable file might be covered by
 | |
|     the GNU General Public License.
 | |
| */
 | |
| 
 | |
| /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
 | |
| Stress-testing sockets, message queues and   threads. These elements are
 | |
| the most important and complicated parts   of multi-threaded servers and
 | |
| require intensive testing. This program performs the following steps for
 | |
| each test:
 | |
| 
 | |
| 	* Create a server consisting of an accepting thread and 5 workers.
 | |
| 	* Run the tests from test_def.
 | |
| 	* Destroy the server
 | |
| 
 | |
| For simple tests, get yourself a free machine and run
 | |
| 
 | |
| 	?- forever.
 | |
| 
 | |
| It creates a logfile named <host>-forever.txt
 | |
| - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
 | |
| 
 | |
| :- asserta(user:file_search_path(foreign, '.')).
 | |
| :- use_module(socket).
 | |
| :- use_module(library(debug)).
 | |
| 
 | |
| :- dynamic
 | |
| 	on_thread/2,
 | |
| 	port/1,
 | |
| 	worker/1.
 | |
| 
 | |
| :- discontiguous
 | |
| 	test_impl/1.
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	       FOREVER		*
 | |
| 		 *******************************/
 | |
| 
 | |
| forever :-
 | |
| 	forever(_).
 | |
| 
 | |
| forever(Test) :-
 | |
| 	gethostname(Host),
 | |
| 	atomic_list_concat([Host, -, 'forever.txt'], Log),
 | |
| 	protocol(Log),
 | |
| 	between(1, 10000000, N),
 | |
| 	  get_time(T),
 | |
| 	  convert_time(T, S),
 | |
| 	  format('***** TEST RUN ~w ***** [~w]~n', [N, S]),
 | |
| 	  test(Test),
 | |
| 	fail.
 | |
| forever(_).				% takes very long
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	       TOPLEVEL		*
 | |
| 		 *******************************/
 | |
| 
 | |
| test :-
 | |
| 	test(_).
 | |
| 
 | |
| test(T) :-
 | |
| 	create_server(5),
 | |
| 	(   test_def(T, Test),
 | |
| 	    (	Test = concurrent(Times, Threads, CTest)
 | |
| 	    ->	run(concurrent(Times, Threads, test_impl(CTest)))
 | |
| 	    ;	run(test_impl(Test))
 | |
| 	    ),
 | |
| 	    fail
 | |
| 	;   true
 | |
| 	),
 | |
| 	stop_server.
 | |
| 
 | |
| test_def(s1, echo(100)).
 | |
| test_def(s2, big(100000)).
 | |
| test_def(s3, big(10000, 10)).
 | |
| test_def(s5, timeout).
 | |
| test_def(s6, nohost).
 | |
| test_def(s7, noport).
 | |
| test_def(c1, concurrent(10, 3, echo(100))).
 | |
| test_def(c1, concurrent(10, 3, big(10000))).
 | |
| 
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *		SERVER		*
 | |
| 		 *******************************/
 | |
| 
 | |
| create_server :-
 | |
| 	create_server(5).
 | |
| 
 | |
| create_server(Workers) :-
 | |
| 	thread_create(server(Workers), _, [alias(server)]),
 | |
| 	thread_get_message(ready),
 | |
| 	debug(tcp, 'Server started', []).
 | |
| 
 | |
| stop_server :-
 | |
| 	forall(worker(_), client(exit, _)),
 | |
| 	debug(tcp, 'Workers exited, prepare to join server', []),
 | |
| 	connect,			% make accept return
 | |
| 	thread_join(server).
 | |
| 
 | |
| server(Workers) :-
 | |
| 	create_pool(Workers),
 | |
| 	tcp_socket(Socket),
 | |
| 	tcp_bind(Socket, Port),
 | |
| 	assert(port(Port)),
 | |
| 	tcp_listen(Socket, 5),
 | |
| 	debug(tcp, 'Server ready on port ~w', [Port]),
 | |
| 	thread_send_message(main, ready),
 | |
| 	repeat,
 | |
| 	  tcp_accept(Socket, Client, _Peer),
 | |
| 	  debug(connect, 'Connection from ~w', [_Peer]),
 | |
| 	  (   worker(_)
 | |
| 	  ->  thread_send_message(queue, Client),
 | |
| 	      fail
 | |
| 	  ;   !,
 | |
| 	      tcp_close_socket(Client)
 | |
| 	  ),
 | |
| 	debug(tcp, 'All workers have gone, stopping', []),
 | |
| 	stop_pool,
 | |
| 	tcp_close_socket(Socket),
 | |
| 	retract(port(Port)).
 | |
| 
 | |
| create_pool(Number) :-
 | |
| 	message_queue_create(queue),
 | |
| 	forall(between(1, Number, _),
 | |
| 	       (   thread_create(work(queue), Id, []),
 | |
| 		   assert(worker(Id))
 | |
| 	       )).
 | |
| 
 | |
| 
 | |
| stop_pool :-
 | |
| 	forall(worker(_),
 | |
| 	       thread_send_message(queue, stop)),
 | |
| 	forall(retract(worker(Id)),
 | |
| 	       thread_join(Id)),
 | |
| 	message_queue_destroy(queue),
 | |
| 	debug(tcp, 'Pool stopped', []).
 | |
| 
 | |
| work(Queue) :-
 | |
| 	repeat,
 | |
| 	thread_get_message(Queue, Socket),
 | |
| 	handle(Socket, Data),
 | |
| 	Data == exit, !.
 | |
| 
 | |
| handle(Client, Data) :-
 | |
| 	tcp_open_socket(Client, In, Out),
 | |
| 	read(In, Data),
 | |
| %	open_null_stream(Null),
 | |
| %	copy_stream_data(In, Null),
 | |
| %	close(Null),
 | |
| 	close(In),
 | |
| 	thread_self(Me),
 | |
| 	debug(data, '~p: Received ~W', [Me, Data, [max_depth(10)]]),
 | |
| 	(   debugging(thread)
 | |
| 	->  assert(on_thread(Me, Data))
 | |
| 	;   true
 | |
| 	),
 | |
| 	do_work(Data, Answer),
 | |
| 	writeq(Out, Answer),
 | |
| 	write(Out, '.\n'),
 | |
| 	debug(data, 'Sent ~W', [Answer, [max_depth(10)]]),
 | |
| 	flush_output(Out),
 | |
| 	close(Out).
 | |
| 
 | |
| thread_join(Id) :-
 | |
| 	thread_join(Id, Status),
 | |
| 	(   Status == true
 | |
| 	->  true
 | |
| 	;   format('WRONG join-status for ~w: ~w~n', [Id, Status])
 | |
| 	).
 | |
| 
 | |
| 
 | |
| do_work(exit, true) :- !,
 | |
| 	thread_self(Me),
 | |
| 	debug(tcp, 'Exit worker ~w', [Me]),
 | |
| 	thread_detach(Me),
 | |
| 	retract(worker(Me)).
 | |
| do_work(echo(Data), Data) :- !.
 | |
| do_work(sleep(Time), true) :- !,
 | |
| 	sleep(Time).
 | |
| 
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	      CLIENT		*
 | |
| 		 *******************************/
 | |
| 
 | |
| host(localhost).
 | |
| 
 | |
| client(Term, Reply) :-
 | |
| 	client(Term, Reply, 0).
 | |
| client(Term, Reply, TimeOut) :-
 | |
| 	host(Host),
 | |
| 	port(Port),
 | |
| 	client(Host, Port, TimeOut, Term, Reply).
 | |
| client(Host, Port, Timeout, Term, Reply) :-
 | |
| 	tcp_socket(Socket),
 | |
| 	tcp_connect(Socket, Host:Port),
 | |
| 	tcp_open_socket(Socket, In, Out),
 | |
| 	send_output(Term, Out),
 | |
| 	close(Out),
 | |
| 	(   Timeout == 0
 | |
| 	->  true
 | |
| 	;   set_stream(In, timeout(Timeout))
 | |
| 	),
 | |
| 	call_cleanup(read(In, Reply), close(In)),
 | |
| 	debug(data, 'Reply = ~W', [Reply, [max_depth(10)]]).
 | |
| 
 | |
| send_output(List, Out) :-
 | |
| 	is_list(List), !,
 | |
| 	send_list_output(List, Out).
 | |
| send_output(Term, Out) :-
 | |
| 	send_term(Term, Out).
 | |
| 
 | |
| send_term(Term, Out) :-
 | |
| 	writeq(Out, Term),
 | |
| 	write(Out, '.\n').
 | |
| 
 | |
| send_list_output([], _).
 | |
| send_list_output([H|T], Out) :-
 | |
| 	send_term(H, Out),
 | |
| 	send_list_output(T, Out).
 | |
| 
 | |
| echo(Term) :-
 | |
| 	client(echo(Term), Reply),
 | |
| 	assertion(Term =@= Reply).
 | |
| 
 | |
| connect :-
 | |
| 	host(Host),
 | |
| 	port(Port),
 | |
| 	tcp_socket(Socket),
 | |
| 	tcp_connect(Socket, Host:Port),
 | |
| 	tcp_close_socket(Socket).
 | |
| 
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	       TESTS		*
 | |
| 		 *******************************/
 | |
| 
 | |
| %	echo(Times)
 | |
| %
 | |
| %	Send a lot of short messages to the server
 | |
| 
 | |
| test_impl(echo(Times)) :- !,
 | |
| 	forall(between(1, Times, N),
 | |
| 	       echo(N)).
 | |
| 
 | |
| %	big(Size)
 | |
| %
 | |
| %	Send a list of 1...Size to the server
 | |
| 
 | |
| test_impl(big(Size)) :-
 | |
| 	findall(X, between(1, Size, X), L),
 | |
| 	echo(L).
 | |
| 
 | |
| %	big(Size, Times)
 | |
| %
 | |
| %	Send Times lists of 1...Size to the server
 | |
| 
 | |
| test_impl(big(Size, Times)) :-
 | |
| 	findall(X, between(1, Size, X), L),
 | |
| 	forall(between(1, Times, _),
 | |
| 	       echo(L)).
 | |
| 
 | |
| test_impl(timeout) :-
 | |
| 	catch(client(sleep(2), _Reply, 1), E, true),
 | |
| 	assertion(E = error(timeout_error(read, '$stream'(_)), _G285)).
 | |
| 
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	       RUN		*
 | |
| 		 *******************************/
 | |
| 
 | |
| :- meta_predicate
 | |
| 	run(:).
 | |
| 
 | |
| run(Goal) :-
 | |
| 	debug(run, ' ** Run ~p ...', [Goal]),
 | |
| 	call_cleanup(Goal, E, cleanup(Goal, E)).
 | |
| 
 | |
| cleanup(Goal, E) :-
 | |
| 	debug(run, ' ** Finished ~p: ~p', [Goal, E]).
 | |
| 
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	  SPECIAL TESTS		*
 | |
| 		 *******************************/
 | |
| 
 | |
| test_impl(nohost) :-
 | |
| 	tcp_socket(S),
 | |
| 	catch(tcp_connect(S, 'foo.bar':80), E, true),
 | |
| 	tcp_close_socket(S),
 | |
| 	assertion(E =@= error(socket_error('Host not found'), _)).
 | |
| 
 | |
| test_impl(noport) :-
 | |
| 	tcp_socket(S),
 | |
| 	catch(call_with_time_limit(5, tcp_connect(S, localhost:4321)),
 | |
| 	      E, true),
 | |
| 	tcp_close_socket(S),
 | |
| 	assertion(E = error(socket_error(_), _)).
 | |
| 
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	   DISTRIBUTION		*
 | |
| 		 *******************************/
 | |
| 
 | |
| on_threads :-
 | |
| 	(   bagof(D, on_thread(Id, D), Ds),
 | |
| 	    length(Ds, L),
 | |
| 	    format('Thread ~w handled ~w~n', [Id, L]),
 | |
| 	    fail
 | |
| 	;   true
 | |
| 	).
 | |
| 
 | |
| clean :-
 | |
| 	retractall(on_thread(_, _)).
 | |
| 
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	       CONCUR		*
 | |
| 		 *******************************/
 | |
| 
 | |
| :- meta_predicate
 | |
| 	concurrent(+, +, :),
 | |
| 	ok(:).
 | |
| 
 | |
| %	concurrent(+Times, +Threads, :Goal)
 | |
| %
 | |
| %	Run Goal Times times concurrent in Threads
 | |
| 
 | |
| concurrent(Times, 1, Goal) :- !,
 | |
| 	forall(between(1, Times, _),
 | |
| 	       ok(Goal)).
 | |
| concurrent(Times, Threads, Goal) :-
 | |
| 	strip_module(Goal, M, G),
 | |
| 	message_queue_create(Done),
 | |
| 	message_queue_create(Queue),
 | |
| 	create_workers(Threads, M, Queue, Done),
 | |
| 	forall(between(1, Times, _),
 | |
| 	       thread_send_message(Queue, goal(G))),
 | |
| 	debug(concurrent, 'Waiting for ~w replies', [Times]),
 | |
| 	wait_n(Times, Done),
 | |
| 	forall(between(1, Threads, _),
 | |
| 	       thread_send_message(Queue, done)),
 | |
| 	debug(concurrent, 'Waiting for ~w threads', [Threads]),
 | |
| 	wait_n(Threads, Done),
 | |
| 	message_queue_destroy(Queue),
 | |
| 	message_queue_destroy(Done),
 | |
| 	debug(concurrent, 'All done', []).
 | |
| 
 | |
| wait_n(0, _) :- !.
 | |
| wait_n(N, Queue) :-
 | |
| 	thread_get_message(Queue, done),
 | |
| 	N2 is N - 1,
 | |
| 	wait_n(N2, Queue).
 | |
| 
 | |
| create_workers(N, Module, Queue, Done) :-
 | |
| 	N > 0, !,
 | |
| 	thread_create(worker(Module, Queue, Done),
 | |
| 		      _,
 | |
| 		      [ detached(true)
 | |
| 		      ]),
 | |
| 	N2 is N - 1,
 | |
| 	create_workers(N2, Module, Queue, Done).
 | |
| create_workers(_, _, _, _).
 | |
| 
 | |
| 
 | |
| worker(Module, Queue, Done) :-
 | |
| 	thread_get_message(Queue, Message),
 | |
| 	(   Message = goal(Goal)
 | |
| 	->  ok(Module:Goal),
 | |
| 	    thread_send_message(Done, done),
 | |
| 	    worker(Module, Queue, Done)
 | |
| 	;   thread_send_message(Done, done)
 | |
| 	).
 | |
| 
 | |
| ok(Goal) :-
 | |
| 	(   catch(Goal, E, true)
 | |
| 	->  (   var(E)
 | |
| 	    ->  true
 | |
| 	    ;   print_message(error, E)
 | |
| 	    )
 | |
| 	;   print_message(warning, failed(Goal))
 | |
| 	).
 | |
| 
 | |
| 		 /*******************************
 | |
| 		 *	       DEBUG		*
 | |
| 		 *******************************/
 | |
| 
 | |
| %:- interactor.
 | |
| :- debug(tcp).
 | |
| %:- debug(data).
 | |
| :- debug(run).
 | |
| :- debug(concurrent).
 |