1109 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Prolog
		
	
	
	
	
	
			
		
		
	
	
			1109 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Prolog
		
	
	
	
	
	
/*  $Id$
 | 
						|
 | 
						|
    Part of SWI-Prolog
 | 
						|
 | 
						|
    Author:        Jan Wielemaker
 | 
						|
    E-mail:        wielemak@science.uva.nl
 | 
						|
    WWW:           http://www.swi-prolog.org
 | 
						|
    Copyright (C): 1985-2007, University of Amsterdam
 | 
						|
 | 
						|
    This program is free software; you can redistribute it and/or
 | 
						|
    modify it under the terms of the GNU General Public License
 | 
						|
    as published by the Free Software Foundation; either version 2
 | 
						|
    of the License, or (at your option) any later version.
 | 
						|
 | 
						|
    This program is distributed in the hope that it will be useful,
 | 
						|
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
    GNU General Public License for more details.
 | 
						|
 | 
						|
    You should have received a copy of the GNU Lesser General Public
 | 
						|
    License along with this library; if not, write to the Free Software
 | 
						|
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 | 
						|
 | 
						|
    As a special exception, if you link this library with other files,
 | 
						|
    compiled with a Free Software compiler, to produce an executable, this
 | 
						|
    library does not by itself cause the resulting executable to be covered
 | 
						|
    by the GNU General Public License. This exception does not however
 | 
						|
    invalidate any other reasons why the executable file might be covered by
 | 
						|
    the GNU General Public License.
 | 
						|
*/
 | 
						|
 | 
						|
:- module(rdf_persistency,
 | 
						|
	  [ rdf_attach_db/2,		% +Directory, +Options
 | 
						|
	    rdf_detach_db/0,		% +Detach current DB
 | 
						|
	    rdf_current_db/1,		% -Directory
 | 
						|
	    rdf_persistency/2,		% +DB, +Bool
 | 
						|
	    rdf_flush_journals/1,	% +Options
 | 
						|
	    rdf_journal_file/2,		% ?DB, ?JournalFile
 | 
						|
	    rdf_db_to_file/2		% ?DB, ?FileBase
 | 
						|
	  ]).
 | 
						|
:- use_module(library('semweb/rdf_db')).
 | 
						|
:- use_module(library(lists)).
 | 
						|
:- use_module(library(url)).
 | 
						|
:- use_module(library(debug)).
 | 
						|
:- use_module(library(error)).
 | 
						|
:- use_module(library(thread)).
 | 
						|
:- use_module(library(pairs)).
 | 
						|
 | 
						|
 | 
						|
/** <module> RDF persistency plugin
 | 
						|
 | 
						|
This  module  provides  persistency   for    rdf_db.pl   based   on  the
 | 
						|
rdf_monitor/2 predicate to  track  changes   to  the  repository.  Where
 | 
						|
previous  versions  used  autosafe  of  the  whole  database  using  the
 | 
						|
quick-load format of rdf_db, this version is  based on a quick-load file
 | 
						|
per source (4th argument of rdf/4), and journalling for edit operations.
 | 
						|
 | 
						|
The result is safe, avoids frequent small   changes to large files which
 | 
						|
makes synchronisation and backup expensive and avoids long disruption of
 | 
						|
the server doing the autosafe. Only loading large files disrupts service
 | 
						|
for some time.
 | 
						|
 | 
						|
The persistent backup of the database is  realised in a directory, using
 | 
						|
a lock file to avoid corruption due to concurrent access. Each source is
 | 
						|
represented by two files, the latest snapshot   and a journal. The state
 | 
						|
is restored by loading  the  snapshot   and  replaying  the journal. The
 | 
						|
predicate rdf_flush_journals/1 can be used to create fresh snapshots and
 | 
						|
delete the journals.
 | 
						|
 | 
						|
@tbd if there is a complete `.new'   snapshot  and no journal, we should
 | 
						|
move the .new to the plain snapshot name as a means of recovery.
 | 
						|
 | 
						|
@see	rdf_edit.pl
 | 
						|
*/
 | 
						|
 | 
						|
:- volatile
 | 
						|
	rdf_directory/1,
 | 
						|
	rdf_option/1,
 | 
						|
	source_journal_fd/2,
 | 
						|
	db_file_base/2,
 | 
						|
	file_base_db/2.
 | 
						|
:- dynamic
 | 
						|
	rdf_directory/1,		% Absolute path
 | 
						|
	rdf_option/1,			% Defined options
 | 
						|
	source_journal_fd/2,		% DB, JournalFD
 | 
						|
	db_file_base/2,			% DB, FileBase
 | 
						|
	file_base_db/2.			% FileBase, DB
 | 
						|
 | 
						|
:- meta_predicate
 | 
						|
	no_agc(0).
 | 
						|
 | 
						|
%%	rdf_attach_db(+Directory, +Options)
 | 
						|
%
 | 
						|
%	Start persistent operations using Directory   as  place to store
 | 
						|
%	files.   There are several cases:
 | 
						|
%
 | 
						|
%		* Empty DB, existing directory
 | 
						|
%		Load the DB from the existing directory
 | 
						|
%
 | 
						|
%		* Full DB, empty directory
 | 
						|
%		Create snapshots for all sources in directory
 | 
						|
%
 | 
						|
%	Options:
 | 
						|
%
 | 
						|
%		* concurrency(+Jobs)
 | 
						|
%		Number of threads to use for loading the initial
 | 
						|
%		database.  If not provided it is the number of CPUs
 | 
						|
%		as optained from the flag =cpu_count=.
 | 
						|
%
 | 
						|
%		* max_open_journals(+Count)
 | 
						|
%		Maximum number of journals kept open.  If not provided,
 | 
						|
%		the default is 10.  See limit_fd_pool/0.
 | 
						|
%
 | 
						|
%		* silent(+BoolOrBrief)
 | 
						|
%		If =true= (default =false=), do not print informational
 | 
						|
%		messages.  Finally, if =brief= it will show minimal
 | 
						|
%		feedback.
 | 
						|
%
 | 
						|
%		* log_nested_transactions(+Boolean)
 | 
						|
%		If =true=, nested _log_ transactions are added to the
 | 
						|
%		journal information.  By default (=false=), no log-term
 | 
						|
%		is added for nested transactions.
 | 
						|
 | 
						|
rdf_attach_db(DirSpec, Options) :-
 | 
						|
	absolute_file_name(DirSpec,
 | 
						|
			   Directory,
 | 
						|
			   [ access(write),
 | 
						|
			     file_type(directory),
 | 
						|
			     file_errors(fail)
 | 
						|
			   ]), !,
 | 
						|
	(   rdf_directory(Directory)
 | 
						|
	->  true			% update settings?
 | 
						|
	;   rdf_detach_db,
 | 
						|
	    mkdir(Directory),
 | 
						|
	    lock_db(Directory),
 | 
						|
	    assert(rdf_directory(Directory)),
 | 
						|
	    assert_options(Options),
 | 
						|
	    stop_monitor,		% make sure not to register load
 | 
						|
	    no_agc(load_db),
 | 
						|
	    at_halt(rdf_detach_db),
 | 
						|
	    start_monitor
 | 
						|
	).
 | 
						|
rdf_attach_db(DirSpec, Options) :-
 | 
						|
	absolute_file_name(DirSpec,
 | 
						|
			   Directory,
 | 
						|
			   [ solutions(all)
 | 
						|
			   ]),
 | 
						|
	(   exists_directory(Directory)
 | 
						|
	->  access_file(Directory, write)
 | 
						|
	;   catch(make_directory(Directory), _, fail)
 | 
						|
	), !,
 | 
						|
	rdf_attach_db(Directory, Options).
 | 
						|
 | 
						|
 | 
						|
assert_options([]).
 | 
						|
assert_options([H|T]) :-
 | 
						|
	(   option_type(H, Check)
 | 
						|
	->  Check,
 | 
						|
	    assert(rdf_option(H))
 | 
						|
	;   true			% ignore options we do not understand
 | 
						|
	),
 | 
						|
	assert_options(T).
 | 
						|
 | 
						|
option_type(concurrency(X),		must_be(positive_integer, X)).
 | 
						|
option_type(max_open_journals(X),	must_be(positive_integer, X)).
 | 
						|
option_type(silent(X),	       must_be(oneof([true,false,brief]), X)).
 | 
						|
option_type(log_nested_transactions(X),	must_be(boolean, X)).
 | 
						|
 | 
						|
 | 
						|
%%	no_agc(:Goal)
 | 
						|
%
 | 
						|
%	Run Goal with atom garbage collection   disabled. Loading an RDF
 | 
						|
%	database creates large amounts  of  atoms   we  *know*  are  not
 | 
						|
%	garbage.
 | 
						|
 | 
						|
no_agc(Goal) :-
 | 
						|
	current_prolog_flag(agc_margin, Old),
 | 
						|
	set_prolog_flag(agc_margin, 0),
 | 
						|
	call_cleanup(Goal, set_prolog_flag(agc_margin, Old)).
 | 
						|
 | 
						|
 | 
						|
%%	rdf_detach_db is det.
 | 
						|
%
 | 
						|
%	Detach from the  current  database.   Succeeds  silently  if  no
 | 
						|
%	database is attached. Normally called at  the end of the program
 | 
						|
%	through at_halt/1.
 | 
						|
 | 
						|
rdf_detach_db :-
 | 
						|
	debug(halt, 'Detaching database', []),
 | 
						|
	stop_monitor,
 | 
						|
	close_journals,
 | 
						|
	(   retract(rdf_directory(Dir))
 | 
						|
	->  debug(halt, 'DB Directory: ~w', [Dir]),
 | 
						|
	    retractall(rdf_option(_)),
 | 
						|
	    retractall(source_journal_fd(_,_)),
 | 
						|
	    retractall(db_file_base(_,_)),
 | 
						|
	    unlock_db(Dir)
 | 
						|
	;   true
 | 
						|
	).
 | 
						|
 | 
						|
 | 
						|
%%	rdf_current_db(?Dir)
 | 
						|
%
 | 
						|
%	True if Dir is the current RDF persistent database.
 | 
						|
 | 
						|
rdf_current_db(Directory) :-
 | 
						|
	rdf_directory(Dir), !,
 | 
						|
	Dir = Directory.
 | 
						|
 | 
						|
 | 
						|
%%	rdf_flush_journals(+Options)
 | 
						|
%
 | 
						|
%	Flush dirty journals.  Options:
 | 
						|
%
 | 
						|
%		* min_size(+KB)
 | 
						|
%		Only flush if journal is over KB in size.
 | 
						|
%		TBD: sensible default size
 | 
						|
 | 
						|
rdf_flush_journals(Options) :-
 | 
						|
	forall(rdf_source(DB),
 | 
						|
	       rdf_flush_journal(DB, Options)).
 | 
						|
 | 
						|
rdf_flush_journal(DB, Options) :-
 | 
						|
	db_files(DB, _SnapshotFile, JournalFile),
 | 
						|
	db_file(JournalFile, File),
 | 
						|
	(   \+ exists_file(File)
 | 
						|
	->  true
 | 
						|
	;   memberchk(min_size(KB), Options),
 | 
						|
	    size_file(JournalFile, Size),
 | 
						|
	    Size / 1024 < KB
 | 
						|
	->  true
 | 
						|
	;   create_db(DB)
 | 
						|
	).
 | 
						|
 | 
						|
		 /*******************************
 | 
						|
		 *	       LOAD		*
 | 
						|
		 *******************************/
 | 
						|
 | 
						|
%%	load_db is det.
 | 
						|
%
 | 
						|
%	Reload database from the directory specified by rdf_directory/1.
 | 
						|
%	First we find all names graphs using find_dbs/1 and then we load
 | 
						|
%	them.
 | 
						|
 | 
						|
load_db :-
 | 
						|
	rdf_directory(Dir),
 | 
						|
	working_directory(Old, Dir),
 | 
						|
	get_time(Wall0),
 | 
						|
	statistics(cputime, T0),
 | 
						|
	call_cleanup(find_dbs(DBs), working_directory(_, Old)),
 | 
						|
	length(DBs, DBCount),
 | 
						|
	verbosity(DBCount, Silent),
 | 
						|
	make_goals(DBs, Silent, 1, DBCount, Goals),
 | 
						|
	concurrency(Jobs),
 | 
						|
	concurrent(Jobs, Goals, []),
 | 
						|
	statistics(cputime, T1),
 | 
						|
	get_time(Wall1),
 | 
						|
	T is T1 - T0,
 | 
						|
	Wall is Wall1 - Wall0,
 | 
						|
	message_level(Silent, Level),
 | 
						|
	print_message(Level, rdf(restore(attached(DBCount, T/Wall)))).
 | 
						|
 | 
						|
 | 
						|
make_goals([], _, _, _, []).
 | 
						|
make_goals([DB|T0], Silent, I,  Total,
 | 
						|
	   [load_source(DB, Silent, I, Total)|T]) :-
 | 
						|
	I2 is I + 1,
 | 
						|
	make_goals(T0, Silent, I2, Total, T).
 | 
						|
 | 
						|
verbosity(_DBCount, Silent) :-
 | 
						|
	rdf_option(silent(Silent)), !.
 | 
						|
verbosity(DBCount, Silent) :-
 | 
						|
	DBCount > 25, !,
 | 
						|
	Silent = brief.
 | 
						|
verbosity(_DBCount, false).
 | 
						|
 | 
						|
 | 
						|
%%	concurrency(-Jobs)
 | 
						|
%
 | 
						|
%	Number of jobs to run concurrently.
 | 
						|
 | 
						|
concurrency(Jobs) :-
 | 
						|
	rdf_option(concurrency(Jobs)), !.
 | 
						|
concurrency(Jobs) :-
 | 
						|
	current_prolog_flag(cpu_count, Jobs),
 | 
						|
	Jobs > 0, !.
 | 
						|
concurrency(1).
 | 
						|
 | 
						|
 | 
						|
%%	find_dbs(-DBs:list(atom)) is det.
 | 
						|
%
 | 
						|
%	DBs is a  list  of  database   (named  graph)  names,  sorted in
 | 
						|
%	increasing file-size. Small files  are   loaded  first  as these
 | 
						|
%	typically contain the schemas and we   want  to avoid re-hashing
 | 
						|
%	large databases due to added rdfs:subPropertyOf triples.
 | 
						|
 | 
						|
find_dbs(DBs) :-
 | 
						|
	expand_file_name(*, Files),
 | 
						|
	phrase(scan_db_files(Files), Scanned),
 | 
						|
	sort(Scanned, ByDB),
 | 
						|
	join_snapshot_and_journals(ByDB, BySize),
 | 
						|
	keysort(BySize, SortedBySize),
 | 
						|
	pairs_values(SortedBySize, DBs).
 | 
						|
 | 
						|
 | 
						|
%%	scan_db_files(+Files)// is det.
 | 
						|
%
 | 
						|
%	Produces a list of db(DB,  Size,   File)  for all recognised RDF
 | 
						|
%	database files.
 | 
						|
 | 
						|
scan_db_files([]) -->
 | 
						|
	[].
 | 
						|
scan_db_files([File|T]) -->
 | 
						|
	{ file_name_extension(Base, Ext, File),
 | 
						|
	  db_extension(Ext), !,
 | 
						|
	  rdf_db_to_file(DB, Base),
 | 
						|
	  size_file(File, Size)
 | 
						|
	},
 | 
						|
	[ db(DB, Size, File) ],
 | 
						|
	scan_db_files(T).
 | 
						|
scan_db_files([_|T]) -->
 | 
						|
	scan_db_files(T).
 | 
						|
 | 
						|
 | 
						|
db_extension(trp).
 | 
						|
db_extension(jrn).
 | 
						|
 | 
						|
join_snapshot_and_journals([], []).
 | 
						|
join_snapshot_and_journals([db(DB,S0,_)|T0], [S-DB|T]) :- !,
 | 
						|
	same_db(DB, T0, T1, S0, S),
 | 
						|
	join_snapshot_and_journals(T1, T).
 | 
						|
 | 
						|
same_db(DB, [db(DB,S1,_)|T0], T, S0, S) :- !,
 | 
						|
	S2 is S0+S1,
 | 
						|
	same_db(DB, T0, T, S2, S).
 | 
						|
same_db(_, L, L, S, S).
 | 
						|
 | 
						|
 | 
						|
%%	load_source(+DB, +Silent, +Nth, +Total) is det.
 | 
						|
%
 | 
						|
%	Load triples and reload  journal   from  the  indicated snapshot
 | 
						|
%	file.
 | 
						|
%
 | 
						|
%	@param Silent One of =false=, =true= or =brief=
 | 
						|
 | 
						|
load_source(DB, Silent, Nth, Total) :-
 | 
						|
	message_level(Silent, Level),
 | 
						|
	db_files(DB, SnapshotFile, JournalFile),
 | 
						|
	rdf_retractall(_,_,_,DB),
 | 
						|
	statistics(cputime, T0),
 | 
						|
	print_message(Level, rdf(restore(Silent, source(DB, Nth, Total)))),
 | 
						|
	db_file(SnapshotFile, AbsSnapShot),
 | 
						|
	(   exists_file(AbsSnapShot)
 | 
						|
	->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
 | 
						|
	    rdf_load_db(AbsSnapShot)
 | 
						|
	;   true
 | 
						|
	),
 | 
						|
	(   exists_db(JournalFile)
 | 
						|
	->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
 | 
						|
	    load_journal(JournalFile, DB)
 | 
						|
	;   true
 | 
						|
	),
 | 
						|
	statistics(cputime, T1),
 | 
						|
	T is T1 - T0,
 | 
						|
	(   rdf_statistics(triples_by_file(DB, Count))
 | 
						|
	->  true
 | 
						|
	;   Count = 0
 | 
						|
	),
 | 
						|
	print_message(Level, rdf(restore(Silent,
 | 
						|
					 done(DB, T, Count, Nth, Total)))).
 | 
						|
 | 
						|
message_level(true, silent) :- !.
 | 
						|
message_level(_, informational).
 | 
						|
 | 
						|
 | 
						|
		 /*******************************
 | 
						|
		 *	   LOAD JOURNAL		*
 | 
						|
		 *******************************/
 | 
						|
 | 
						|
%%	load_journal(+File:atom, +DB:atom) is det.
 | 
						|
%
 | 
						|
%	Process transactions from the RDF journal File, adding the given
 | 
						|
%	named graph.
 | 
						|
 | 
						|
load_journal(File, DB) :-
 | 
						|
	open_db(File, read, In, []),
 | 
						|
	call_cleanup((  read(In, T0),
 | 
						|
			process_journal(T0, In, DB)
 | 
						|
		     ),
 | 
						|
		     close(In)).
 | 
						|
 | 
						|
process_journal(end_of_file, _, _) :- !.
 | 
						|
process_journal(Term, In, DB) :-
 | 
						|
	(   process_journal_term(Term, DB)
 | 
						|
	->  true
 | 
						|
	;   throw(error(type_error(journal_term, Term), _))
 | 
						|
	),
 | 
						|
	read(In, T2),
 | 
						|
	process_journal(T2, In, DB).
 | 
						|
 | 
						|
process_journal_term(assert(S,P,O), DB) :-
 | 
						|
	rdf_assert(S,P,O,DB).
 | 
						|
process_journal_term(assert(S,P,O,Line), DB) :-
 | 
						|
	rdf_assert(S,P,O,DB:Line).
 | 
						|
process_journal_term(retract(S,P,O), DB) :-
 | 
						|
	rdf_retractall(S,P,O,DB).
 | 
						|
process_journal_term(retract(S,P,O,Line), DB) :-
 | 
						|
	rdf_retractall(S,P,O,DB:Line).
 | 
						|
process_journal_term(update(S,P,O,Action), DB) :-
 | 
						|
	(   rdf_update(S,P,O,DB, Action)
 | 
						|
	->  true
 | 
						|
	;   print_message(warning, rdf(update_failed(S,P,O,Action)))
 | 
						|
	).
 | 
						|
process_journal_term(start(_), _).	% journal open/close
 | 
						|
process_journal_term(end(_), _).
 | 
						|
process_journal_term(begin(_), _).	% logged transaction (compatibility)
 | 
						|
process_journal_term(end, _).
 | 
						|
process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
 | 
						|
process_journal_term(end(_,_,_), _).
 | 
						|
 | 
						|
 | 
						|
		 /*******************************
 | 
						|
		 *	   CREATE JOURNAL	*
 | 
						|
		 *******************************/
 | 
						|
 | 
						|
:- dynamic
 | 
						|
	blocked_db/2,			% DB, Reason
 | 
						|
	transaction_message/3,		% Nesting, Time, Message
 | 
						|
	transaction_db/3.		% Nesting, DB, Id
 | 
						|
 | 
						|
%%	rdf_persistency(+DB, Bool)
 | 
						|
%
 | 
						|
%	Specify whether a database is persistent.  Switching to =false=
 | 
						|
%       kills the persistent state.  Switching to =true= creates it.
 | 
						|
 | 
						|
rdf_persistency(DB, Bool) :-
 | 
						|
	must_be(atom, DB),
 | 
						|
	must_be(boolean, Bool),
 | 
						|
	fail.
 | 
						|
rdf_persistency(DB, false) :- !,
 | 
						|
	(   blocked_db(DB, persistency)
 | 
						|
	->  true
 | 
						|
	;   assert(blocked_db(DB, persistency)),
 | 
						|
	    delete_db(DB)
 | 
						|
	).
 | 
						|
rdf_persistency(DB, true) :-
 | 
						|
	(   retract(blocked_db(DB, persistency))
 | 
						|
	->  create_db(DB)
 | 
						|
	;   true
 | 
						|
	).
 | 
						|
 | 
						|
 | 
						|
%%	start_monitor is det.
 | 
						|
%%	stop_monitor is det.
 | 
						|
%
 | 
						|
%	Start/stop monitoring the RDF database   for  changes and update
 | 
						|
%	the journal.
 | 
						|
 | 
						|
start_monitor :-
 | 
						|
	rdf_monitor(monitor,
 | 
						|
		    [ -assert(load)
 | 
						|
		    ]).
 | 
						|
stop_monitor :-
 | 
						|
	rdf_monitor(monitor,
 | 
						|
		    [ -all
 | 
						|
		    ]).
 | 
						|
 | 
						|
%%	monitor(+Term) is semidet.
 | 
						|
%
 | 
						|
%	Handle an rdf_monitor/2 callback to  deal with persistency. Note
 | 
						|
%	that the monitor calls that come   from rdf_db.pl that deal with
 | 
						|
%	database changes are serialized.  They   do  come from different
 | 
						|
%	threads though.
 | 
						|
 | 
						|
monitor(Msg) :-
 | 
						|
	debug(monitor, 'Monitor: ~p~n', [Msg]),
 | 
						|
	fail.
 | 
						|
monitor(assert(S,P,O,DB:Line)) :- !,
 | 
						|
	\+ blocked_db(DB, _),
 | 
						|
	journal_fd(DB, Fd),
 | 
						|
	open_transaction(DB, Fd),
 | 
						|
	format(Fd, '~q.~n', [assert(S,P,O,Line)]),
 | 
						|
	sync_journal(DB, Fd).
 | 
						|
monitor(assert(S,P,O,DB)) :-
 | 
						|
	\+ blocked_db(DB, _),
 | 
						|
	journal_fd(DB, Fd),
 | 
						|
	open_transaction(DB, Fd),
 | 
						|
	format(Fd, '~q.~n', [assert(S,P,O)]),
 | 
						|
	sync_journal(DB, Fd).
 | 
						|
monitor(retract(S,P,O,DB:Line)) :- !,
 | 
						|
	\+ blocked_db(DB, _),
 | 
						|
	journal_fd(DB, Fd),
 | 
						|
	open_transaction(DB, Fd),
 | 
						|
	format(Fd, '~q.~n', [retract(S,P,O,Line)]),
 | 
						|
	sync_journal(DB, Fd).
 | 
						|
monitor(retract(S,P,O,DB)) :-
 | 
						|
	\+ blocked_db(DB, _),
 | 
						|
	journal_fd(DB, Fd),
 | 
						|
	open_transaction(DB, Fd),
 | 
						|
	format(Fd, '~q.~n', [retract(S,P,O)]),
 | 
						|
	sync_journal(DB, Fd).
 | 
						|
monitor(update(S,P,O,DB:Line,Action)) :- !,
 | 
						|
	\+ blocked_db(DB, _),
 | 
						|
	(   Action = source(NewDB)
 | 
						|
	->  monitor(assert(S,P,O,NewDB)),
 | 
						|
	    monitor(retract(S,P,O,DB:Line))
 | 
						|
	;   journal_fd(DB, Fd),
 | 
						|
	    format(Fd, '~q.~n', [update(S,P,O,Action)]),
 | 
						|
	    sync_journal(DB, Fd)
 | 
						|
	).
 | 
						|
monitor(update(S,P,O,DB,Action)) :-
 | 
						|
	\+ blocked_db(DB, _),
 | 
						|
	(   Action = source(NewDB)
 | 
						|
	->  monitor(assert(S,P,O,NewDB)),
 | 
						|
	    monitor(retract(S,P,O,DB))
 | 
						|
	;   journal_fd(DB, Fd),
 | 
						|
	    open_transaction(DB, Fd),
 | 
						|
	    format(Fd, '~q.~n', [update(S,P,O,Action)]),
 | 
						|
	    sync_journal(DB, Fd)
 | 
						|
	).
 | 
						|
monitor(load(BE, Id)) :-
 | 
						|
	(   BE == begin
 | 
						|
	->  push_state(Id)
 | 
						|
	;   sync_state(Id)
 | 
						|
	).
 | 
						|
monitor(transaction(BE, Id)) :-
 | 
						|
	monitor_transaction(Id, BE).
 | 
						|
 | 
						|
monitor_transaction(load_journal(DB), begin(_)) :- !,
 | 
						|
	assert(blocked_db(DB, journal)).
 | 
						|
monitor_transaction(load_journal(DB), end(_)) :- !,
 | 
						|
	retractall(blocked_db(DB, journal)).
 | 
						|
 | 
						|
monitor_transaction(parse(URI), begin(_)) :- !,
 | 
						|
	(   blocked_db(URI, persistency)
 | 
						|
	->  true
 | 
						|
	;   assert(blocked_db(URI, parse))
 | 
						|
	).
 | 
						|
monitor_transaction(parse(URI), end(_)) :- !,
 | 
						|
	(   retract(blocked_db(URI, parse))
 | 
						|
	->  create_db(URI)
 | 
						|
	;   true
 | 
						|
	).
 | 
						|
monitor_transaction(unload(DB), begin(_)) :- !,
 | 
						|
	(   blocked_db(DB, persistency)
 | 
						|
	->  true
 | 
						|
	;   assert(blocked_db(DB, unload))
 | 
						|
	).
 | 
						|
monitor_transaction(unload(DB), end(_)) :- !,
 | 
						|
	(   retract(blocked_db(DB, unload))
 | 
						|
	->  delete_db(DB)
 | 
						|
	;   true
 | 
						|
	).
 | 
						|
monitor_transaction(log(Msg), begin(N)) :- !,
 | 
						|
	check_nested(N),
 | 
						|
	get_time(Time),
 | 
						|
	asserta(transaction_message(N, Time, Msg)).
 | 
						|
monitor_transaction(log(_), end(N)) :-
 | 
						|
	check_nested(N),
 | 
						|
	retract(transaction_message(N, _, _)), !,
 | 
						|
	findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
 | 
						|
	end_transactions(DBs, N).
 | 
						|
monitor_transaction(log(Msg, DB), begin(N)) :- !,
 | 
						|
	check_nested(N),
 | 
						|
	get_time(Time),
 | 
						|
	asserta(transaction_message(N, Time, Msg)),
 | 
						|
	journal_fd(DB, Fd),
 | 
						|
	open_transaction(DB, Fd).
 | 
						|
monitor_transaction(log(Msg, _DB), end(N)) :-
 | 
						|
	monitor_transaction(log(Msg), end(N)).
 | 
						|
monitor_transaction(reset, begin(L)) :-
 | 
						|
	forall(rdf_graph(DB),
 | 
						|
	       monitor_transaction(unload(DB), begin(L))).
 | 
						|
monitor_transaction(reset, end(L)) :-
 | 
						|
	forall(blocked_db(DB, unload),
 | 
						|
	       monitor_transaction(unload(DB), end(L))),
 | 
						|
	retractall(current_transaction_id(_,_)).
 | 
						|
 | 
						|
 | 
						|
%%	check_nested(+Level) is semidet.
 | 
						|
%
 | 
						|
%	True if we must log this transaction.   This  is always the case
 | 
						|
%	for toplevel transactions. Nested transactions   are only logged
 | 
						|
%	if log_nested_transactions(true) is defined.
 | 
						|
 | 
						|
check_nested(0) :- !.
 | 
						|
check_nested(_) :-
 | 
						|
	rdf_option(log_nested_transactions(true)).
 | 
						|
 | 
						|
 | 
						|
%%	open_transaction(+DB, +Fd) is det.
 | 
						|
%
 | 
						|
%	Add a begin(Id, Level, Time,  Message)   term  if  a transaction
 | 
						|
%	involves DB. Id is an incremental   integer, where each database
 | 
						|
%	has its own counter. Level is the nesting level, Time a floating
 | 
						|
%	point timestamp and Message te message   provided as argument to
 | 
						|
%	the log message.
 | 
						|
 | 
						|
open_transaction(DB, Fd) :-
 | 
						|
	transaction_message(N, Time, Msg), !,
 | 
						|
	(   transaction_db(N, DB, _)
 | 
						|
	->  true
 | 
						|
	;   next_transaction_id(DB, Id),
 | 
						|
	    assert(transaction_db(N, DB, Id)),
 | 
						|
	    format(Fd, 'begin(~q, ~q, ~2f, ~q).~n', [Id, N, Time, Msg])
 | 
						|
	).
 | 
						|
open_transaction(_,_).
 | 
						|
 | 
						|
 | 
						|
%%	next_transaction_id(+DB, -Id) is det.
 | 
						|
%
 | 
						|
%	Id is the number to user for  the next logged transaction on DB.
 | 
						|
%	Transactions in each  named  graph   are  numbered  in sequence.
 | 
						|
%	Searching the Id of the last transaction is performed by the 2nd
 | 
						|
%	clause starting 1Kb from the end   and doubling this offset each
 | 
						|
%	failure.
 | 
						|
 | 
						|
:- dynamic
 | 
						|
	current_transaction_id/2.
 | 
						|
 | 
						|
next_transaction_id(DB, Id) :-
 | 
						|
	retract(current_transaction_id(DB, Last)), !,
 | 
						|
	Id is Last + 1,
 | 
						|
	assert(current_transaction_id(DB, Id)).
 | 
						|
next_transaction_id(DB, Id) :-
 | 
						|
	db_files(DB, _, Journal),
 | 
						|
	exists_file(Journal), !,
 | 
						|
	size_file(Journal, Size),
 | 
						|
	open_db(Journal, read, In, []),
 | 
						|
	call_cleanup(iterative_expand(In, Size, Last), close(In)),
 | 
						|
	Id is Last + 1,
 | 
						|
	assert(current_transaction_id(DB, Id)).
 | 
						|
next_transaction_id(DB, 1) :-
 | 
						|
	assert(current_transaction_id(DB, 1)).
 | 
						|
 | 
						|
iterative_expand(_, 0, 0) :- !.
 | 
						|
iterative_expand(In, Size, Last) :-	% Scan growing sections from the end
 | 
						|
	Max is floor(log(Size)/log(2)),
 | 
						|
	between(10, Max, Step),
 | 
						|
	Offset is -(1<<Step),
 | 
						|
	seek(In, Offset, eof, _),
 | 
						|
	skip(In, 10),			% records are line-based
 | 
						|
	read(In, T0),
 | 
						|
	last_transaction_id(T0, In, 0, Last),
 | 
						|
	Last > 0, !.
 | 
						|
iterative_expand(In, _, Last) :-	% Scan the whole file
 | 
						|
	seek(In, 0, bof, _),
 | 
						|
	read(In, T0),
 | 
						|
	last_transaction_id(T0, In, 0, Last).
 | 
						|
 | 
						|
last_transaction_id(end_of_file, _, Last, Last) :- !.
 | 
						|
last_transaction_id(end(Id, _, _), In, _, Last) :-
 | 
						|
	read(In, T1),
 | 
						|
	last_transaction_id(T1, In, Id, Last).
 | 
						|
last_transaction_id(_, In, Id, Last) :-
 | 
						|
	read(In, T1),
 | 
						|
	last_transaction_id(T1, In, Id, Last).
 | 
						|
 | 
						|
 | 
						|
%%	end_transactions(+DBs:list(atom:id)) is det.
 | 
						|
%
 | 
						|
%	End a transaction that affected the  given list of databases. We
 | 
						|
%	write the list of other affected databases as an argument to the
 | 
						|
%	end-term to facilitate fast finding of the related transactions.
 | 
						|
%
 | 
						|
%	In each database, the transaction is   ended with a term end(Id,
 | 
						|
%	Nesting, Others), where  Id  and   Nesting  are  the transaction
 | 
						|
%	identifier and nesting (see open_transaction/2)  and Others is a
 | 
						|
%	list of DB:Id,  indicating  other   databases  affected  by  the
 | 
						|
%	transaction.
 | 
						|
 | 
						|
end_transactions(DBs, N) :-
 | 
						|
	end_transactions(DBs, DBs, N).
 | 
						|
 | 
						|
end_transactions([], _, _).
 | 
						|
end_transactions([DB:Id|T], DBs, N) :-
 | 
						|
	journal_fd(DB, Fd),
 | 
						|
	once(select(DB:Id, DBs, Others)),
 | 
						|
	format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
 | 
						|
	sync_journal(DB, Fd),
 | 
						|
	end_transactions(T, DBs, N).
 | 
						|
 | 
						|
 | 
						|
%	State  handling.  We  use   this    for   trapping   changes  by
 | 
						|
%	rdf_load_db/1. In theory, loading such files  can add triples to
 | 
						|
%	multiple sources. In practice this rarely   happens. We save the
 | 
						|
%	current state and sync all  files   that  have changed. The only
 | 
						|
%	drawback of this approach is that loaded files spreading triples
 | 
						|
%	over multiple databases cause all these   databases  to be fully
 | 
						|
%	synchronised. This shouldn't happen very often.
 | 
						|
 | 
						|
:- dynamic
 | 
						|
	pre_load_state/2.
 | 
						|
 | 
						|
push_state(Id) :-
 | 
						|
	get_state(State),
 | 
						|
	asserta(pre_load_state(Id, State)).
 | 
						|
 | 
						|
get_state(State) :-
 | 
						|
	findall(DB-MD5, (rdf_graph(DB), rdf_md5(DB, MD5)), State0),
 | 
						|
	keysort(State0, State).
 | 
						|
 | 
						|
sync_state(Id) :-
 | 
						|
	retract(pre_load_state(Id, PreState)),
 | 
						|
	get_state(AfterState),
 | 
						|
	sync_state(AfterState, PreState).
 | 
						|
 | 
						|
sync_state([], _).
 | 
						|
sync_state([DB-MD5|TA], Pre) :-
 | 
						|
	(   memberchk(DB-MD5P, Pre),
 | 
						|
	    MD5P == MD5
 | 
						|
	->  true
 | 
						|
	;   create_db(DB)
 | 
						|
	),
 | 
						|
	sync_state(TA, Pre).
 | 
						|
 | 
						|
 | 
						|
		 /*******************************
 | 
						|
		 *	   JOURNAL FILES	*
 | 
						|
		 *******************************/
 | 
						|
 | 
						|
%%	journal_fd(+DB, -Stream) is det.
 | 
						|
%
 | 
						|
%	Get an open stream to a journal. If the journal is not open, old
 | 
						|
%	journals are closed to satisfy   the =max_open_journals= option.
 | 
						|
%	Then the journal is opened in   =append= mode. Journal files are
 | 
						|
%	always encoded as UTF-8 for  portability   as  well as to ensure
 | 
						|
%	full coverage of Unicode.
 | 
						|
 | 
						|
journal_fd(DB, Fd) :-
 | 
						|
	source_journal_fd(DB, Fd), !.
 | 
						|
journal_fd(DB, Fd) :-
 | 
						|
	with_mutex(rdf_journal_file,
 | 
						|
		   journal_fd_(DB, Out)),
 | 
						|
	Fd = Out.
 | 
						|
 | 
						|
journal_fd_(DB, Fd) :-
 | 
						|
	source_journal_fd(DB, Fd), !.
 | 
						|
journal_fd_(DB, Fd) :-
 | 
						|
	limit_fd_pool,
 | 
						|
	db_files(DB, _Snapshot, Journal),
 | 
						|
	open_db(Journal, append, Fd,
 | 
						|
		[ close_on_abort(false)
 | 
						|
		]),
 | 
						|
	time_stamp(Now),
 | 
						|
	format(Fd, '~q.~n', [start([time(Now)])]),
 | 
						|
	assert(source_journal_fd(DB, Fd)).		% new one at the end
 | 
						|
 | 
						|
%%	limit_fd_pool is det.
 | 
						|
%
 | 
						|
%	Limit the number of  open   journals  to max_open_journals (10).
 | 
						|
%	Note that calls  from  rdf_monitor/2   are  issued  in different
 | 
						|
%	threads, but as they are part of write operations they are fully
 | 
						|
%	synchronised.
 | 
						|
 | 
						|
limit_fd_pool :-
 | 
						|
	predicate_property(source_journal_fd(_, _), number_of_clauses(N)), !,
 | 
						|
	(   rdf_option(max_open_journals(Max))
 | 
						|
	->  true
 | 
						|
	;   Max = 10
 | 
						|
	),
 | 
						|
	Close is N - Max,
 | 
						|
	forall(between(1, Close, _),
 | 
						|
	       close_oldest_journal).
 | 
						|
limit_fd_pool.
 | 
						|
 | 
						|
close_oldest_journal :-
 | 
						|
	source_journal_fd(DB, _Fd), !,
 | 
						|
	debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
 | 
						|
	close_journal(DB).
 | 
						|
close_oldest_journal.
 | 
						|
 | 
						|
 | 
						|
%%	sync_journal(+DB, +Fd)
 | 
						|
%
 | 
						|
%	Sync journal represented by database and   stream.  If the DB is
 | 
						|
%	involved in a transaction there is   no point flushing until the
 | 
						|
%	end of the transaction.
 | 
						|
 | 
						|
sync_journal(DB, _) :-
 | 
						|
	transaction_db(_, DB, _), !.
 | 
						|
sync_journal(_, Fd) :-
 | 
						|
	flush_output(Fd).
 | 
						|
 | 
						|
%%	close_journal(+DB) is det.
 | 
						|
%
 | 
						|
%	Close the journal associated with DB if it is open.
 | 
						|
 | 
						|
close_journal(DB) :-
 | 
						|
	with_mutex(rdf_journal_file,
 | 
						|
		   close_journal_(DB)).
 | 
						|
 | 
						|
close_journal_(DB) :-
 | 
						|
	(   retract(source_journal_fd(DB, Fd))
 | 
						|
	->  time_stamp(Now),
 | 
						|
	    format(Fd, '~q.~n', [end([time(Now)])]),
 | 
						|
	    close(Fd, [force(true)])
 | 
						|
	;   true
 | 
						|
	).
 | 
						|
 | 
						|
%	close_journals
 | 
						|
%
 | 
						|
%	Close all open journals.
 | 
						|
 | 
						|
close_journals :-
 | 
						|
	forall(source_journal_fd(DB, _),
 | 
						|
	       catch(close_journal(DB), E,
 | 
						|
		     print_message(error, E))).
 | 
						|
 | 
						|
%%	create_db(+DB)
 | 
						|
%
 | 
						|
%	Create a saved version of DB in corresponding file, close and
 | 
						|
%	delete journals.
 | 
						|
 | 
						|
create_db(DB) :-
 | 
						|
	debug(rdf_persistency, 'Saving DB ~w', [DB]),
 | 
						|
	db_abs_files(DB, Snapshot, Journal),
 | 
						|
	atom_concat(Snapshot, '.new', NewSnapshot),
 | 
						|
	(   catch(rdf_save_db(NewSnapshot, DB), _, fail)
 | 
						|
	->  (   exists_file(Journal)
 | 
						|
	    ->  delete_file(Journal)
 | 
						|
	    ;   true
 | 
						|
	    ),
 | 
						|
	    rename_file(NewSnapshot, Snapshot),
 | 
						|
	    debug(rdf_persistency, 'Saved DB ~w', [DB])
 | 
						|
	;   catch(delete_file(NewSnapshot), _, true)
 | 
						|
	).
 | 
						|
 | 
						|
 | 
						|
%%	delete_db(+DB)
 | 
						|
%
 | 
						|
%	Remove snapshot and journal file for DB.
 | 
						|
 | 
						|
delete_db(DB) :-
 | 
						|
	db_abs_files(DB, Snapshot, Journal),
 | 
						|
	(   exists_file(Journal)
 | 
						|
	->  delete_file(Journal)
 | 
						|
	;   true
 | 
						|
	),
 | 
						|
	(   exists_file(Snapshot)
 | 
						|
	->  delete_file(Snapshot)
 | 
						|
	;   true
 | 
						|
	).
 | 
						|
 | 
						|
 | 
						|
		 /*******************************
 | 
						|
		 *	       LOCKING		*
 | 
						|
		 *******************************/
 | 
						|
 | 
						|
%%	lock_db(+Dir)
 | 
						|
%
 | 
						|
%	Lock the database  directory.  This  isn't   safe  as  the  file
 | 
						|
%	operations are not  atomic.  Needs   re-thinking,  but  with the
 | 
						|
%	normal server setting it should be ok.
 | 
						|
 | 
						|
lock_db(Dir) :-
 | 
						|
	lockfile(Dir, File),
 | 
						|
	exists_file(File), !,
 | 
						|
	(   catch(read_file_to_terms(File, Terms, []), _, fail),
 | 
						|
	    Terms = [locked(Args)]
 | 
						|
	->  Context = rdf_locked(Args)
 | 
						|
	;   Context = context(_, 'Database is in use')
 | 
						|
	),
 | 
						|
	throw(error(permission_error(lock, rdf_db, Dir), Context)).
 | 
						|
lock_db(Dir) :-
 | 
						|
	lockfile(Dir, File),
 | 
						|
	open(File, write, Out),
 | 
						|
	(   current_prolog_flag(pid, PID)
 | 
						|
	->  true
 | 
						|
	;   PID = 0			% TBD: Fix in Prolog
 | 
						|
	),
 | 
						|
	time_stamp(Now),
 | 
						|
	format(Out, '/* RDF Database is in use */~n~n', []),
 | 
						|
	format(Out, '~q.~n', [ locked([ time(Now),
 | 
						|
					pid(PID)
 | 
						|
				      ])
 | 
						|
			     ]),
 | 
						|
	close(Out),
 | 
						|
	at_halt(unlock_db(Dir)).
 | 
						|
 | 
						|
unlock_db(Dir) :-
 | 
						|
	lockfile(Dir, File),
 | 
						|
	(   exists_file(File)
 | 
						|
	->  delete_file(File)
 | 
						|
	;   true
 | 
						|
	).
 | 
						|
 | 
						|
		 /*******************************
 | 
						|
		 *	     FILENAMES		*
 | 
						|
		 *******************************/
 | 
						|
 | 
						|
lockfile(Dir, LockFile) :-
 | 
						|
	atomic_list_concat([Dir, /, lock], LockFile).
 | 
						|
 | 
						|
db_file(Base, File) :-
 | 
						|
	rdf_directory(Dir),
 | 
						|
	atomic_list_concat([Dir, /, Base], File).
 | 
						|
 | 
						|
open_db(Base, Mode, Stream, Options) :-
 | 
						|
	db_file(Base, File),
 | 
						|
	open(File, Mode, Stream, [encoding(utf8)|Options]).
 | 
						|
 | 
						|
exists_db(Base) :-
 | 
						|
	db_file(Base, File),
 | 
						|
	exists_file(File).
 | 
						|
 | 
						|
%%	db_files(+DB, -Snapshot, -Journal).
 | 
						|
%%	db_files(-DB, +Snapshot, -Journal).
 | 
						|
%%	db_files(-DB, -Snapshot, +Journal).
 | 
						|
%
 | 
						|
%	True if named graph DB is represented  by the files Snapshot and
 | 
						|
%	Journal. The filenames are local   to the directory representing
 | 
						|
%	the store.
 | 
						|
 | 
						|
db_files(DB, Snapshot, Journal) :-
 | 
						|
	nonvar(DB), !,
 | 
						|
	rdf_db_to_file(DB, Base),
 | 
						|
	atom_concat(Base, '.trp', Snapshot),
 | 
						|
	atom_concat(Base, '.jrn', Journal).
 | 
						|
db_files(DB, Snapshot, Journal) :-
 | 
						|
	nonvar(Snapshot), !,
 | 
						|
	atom_concat(Base, '.trp', Snapshot),
 | 
						|
	atom_concat(Base, '.jrn', Journal),
 | 
						|
	rdf_db_to_file(DB, Base).
 | 
						|
db_files(DB, Snapshot, Journal) :-
 | 
						|
	nonvar(Journal), !,
 | 
						|
	atom_concat(Base, '.jrn', Journal),
 | 
						|
	atom_concat(Base, '.trp', Snapshot),
 | 
						|
	rdf_db_to_file(DB, Base).
 | 
						|
 | 
						|
db_abs_files(DB, Snapshot, Journal) :-
 | 
						|
	db_files(DB, Snapshot0, Journal0),
 | 
						|
	db_file(Snapshot0, Snapshot),
 | 
						|
	db_file(Journal0, Journal).
 | 
						|
 | 
						|
 | 
						|
%%	rdf_journal_file(+DB, -File) is semidet.
 | 
						|
%%	rdf_journal_file(-DB, -File) is nondet.
 | 
						|
%
 | 
						|
%	True if File is the absolute  file   name  of  an existing named
 | 
						|
%	graph DB.
 | 
						|
%
 | 
						|
%	@tbd	Avoid using private rdf_db:rdf_graphs_/1.
 | 
						|
 | 
						|
rdf_journal_file(DB, Journal) :-
 | 
						|
	(   var(DB)
 | 
						|
	->  rdf_db:rdf_graphs_(All),	% also pick the empty graphs
 | 
						|
	    member(DB, All)
 | 
						|
	;   true
 | 
						|
	),
 | 
						|
	db_abs_files(DB, _Snapshot, Journal),
 | 
						|
	exists_file(Journal).
 | 
						|
 | 
						|
 | 
						|
%%	rdf_db_to_file(+DB, -File) is det.
 | 
						|
%%	rdf_db_to_file(-DB, +File) is det.
 | 
						|
%
 | 
						|
%	Translate between database encoding (often an   file or URL) and
 | 
						|
%	the name we store in the  directory.   We  keep  a cache for two
 | 
						|
%	reasons. Speed, but much more important   is that the mapping of
 | 
						|
%	raw --> encoded provided by  www_form_encode/2 is not guaranteed
 | 
						|
%	to be unique by the W3C standards.
 | 
						|
%
 | 
						|
%	@tbd	We keep two predicates for exploiting Prolog indexing.
 | 
						|
%		Once multi-argument indexed is hashed we should clean
 | 
						|
%		this up.
 | 
						|
 | 
						|
rdf_db_to_file(DB, File) :-
 | 
						|
	nonvar(File),
 | 
						|
	file_base_db(File, DB), !.
 | 
						|
rdf_db_to_file(DB, File) :-
 | 
						|
	nonvar(DB),
 | 
						|
	db_file_base(DB, File), !.
 | 
						|
rdf_db_to_file(DB, File) :-
 | 
						|
	url_to_filename(DB, File),
 | 
						|
	assert(db_file_base(DB, File)),
 | 
						|
	assert(file_base_db(File, DB)).
 | 
						|
 | 
						|
%%	url_to_filename(+URL, -FileName) is det.
 | 
						|
%%	url_to_filename(-URL, +FileName) is det.
 | 
						|
%
 | 
						|
%	Turn  a  valid  URL  into  a  filename.  Earlier  versions  used
 | 
						|
%	www_form_encode/2, but this can produce  characters that are not
 | 
						|
%	valid  in  filenames.  We  will  use    the   same  encoding  as
 | 
						|
%	www_form_encode/2,  but  using  our  own    rules   for  allowed
 | 
						|
%	characters. The only requirement is that   we avoid any filename
 | 
						|
%	special character in use.  The   current  encoding  use US-ASCII
 | 
						|
%	alnum characters, _ and %
 | 
						|
 | 
						|
url_to_filename(URL, FileName) :-
 | 
						|
	atomic(URL), !,
 | 
						|
	atom_codes(URL, Codes),
 | 
						|
	phrase(url_encode(EncCodes), Codes),
 | 
						|
	atom_codes(FileName, EncCodes).
 | 
						|
url_to_filename(URL, FileName) :-
 | 
						|
	www_form_encode(URL, FileName).
 | 
						|
 | 
						|
url_encode([0'+|T]) -->
 | 
						|
	" ", !,
 | 
						|
        url_encode(T).
 | 
						|
url_encode([C|T]) -->
 | 
						|
	alphanum(C), !,
 | 
						|
	url_encode(T).
 | 
						|
url_encode([C|T]) -->
 | 
						|
	no_enc_extra(C), !,
 | 
						|
	url_encode(T).
 | 
						|
url_encode(Enc) -->
 | 
						|
	(   "\r\n"
 | 
						|
	;   "\n"
 | 
						|
	), !,
 | 
						|
	{ append("%0D%0A", T, Enc)
 | 
						|
	},
 | 
						|
	url_encode(T).
 | 
						|
url_encode([]) -->
 | 
						|
	eos, !.
 | 
						|
url_encode([0'%,D1,D2|T]) -->
 | 
						|
	[C],
 | 
						|
	{ Dv1 is (C>>4 /\ 0xf),
 | 
						|
	  Dv2 is (C /\ 0xf),
 | 
						|
	  code_type(D1, xdigit(Dv1)),
 | 
						|
	  code_type(D2, xdigit(Dv2))
 | 
						|
	},
 | 
						|
	url_encode(T).
 | 
						|
 | 
						|
eos([], []).
 | 
						|
 | 
						|
alphanum(C) -->
 | 
						|
	[C],
 | 
						|
	{ C < 128,			% US-ASCII
 | 
						|
	  code_type(C, alnum)
 | 
						|
	}.
 | 
						|
 | 
						|
no_enc_extra(0'_) --> "_".		%'
 | 
						|
 | 
						|
 | 
						|
		 /*******************************
 | 
						|
		 *		UTIL		*
 | 
						|
		 *******************************/
 | 
						|
 | 
						|
%%	mkdir(+Directory)
 | 
						|
%
 | 
						|
%	Create a directory if it does not already exist.
 | 
						|
 | 
						|
mkdir(Directory) :-
 | 
						|
	exists_directory(Directory), !.
 | 
						|
mkdir(Directory) :-
 | 
						|
	make_directory(Directory).
 | 
						|
 | 
						|
%%	time_stamp(-Integer)
 | 
						|
%
 | 
						|
%	Return time-stamp rounded to integer.
 | 
						|
 | 
						|
time_stamp(Int) :-
 | 
						|
	get_time(Now),
 | 
						|
	Int is round(Now).
 | 
						|
 | 
						|
 | 
						|
		 /*******************************
 | 
						|
		 *	      MESSAGES		*
 | 
						|
		 *******************************/
 | 
						|
 | 
						|
:- multifile
 | 
						|
	prolog:message/3,
 | 
						|
	prolog:message_context/3.
 | 
						|
 | 
						|
prolog:message(rdf(Term)) -->
 | 
						|
	message(Term).
 | 
						|
 | 
						|
message(restore(attached(Graphs, Time/Wall))) -->
 | 
						|
	{ catch(Percent is round(100*Time/Wall), _, Percent = 0) },
 | 
						|
	[ 'Attached ~D graphs in ~2f seconds (~d% CPU = ~2f sec.)'-
 | 
						|
	  [Graphs, Wall, Percent, Time] ].
 | 
						|
message(restore(true, Action)) --> !,
 | 
						|
	silent_message(Action).
 | 
						|
message(restore(brief, Action)) --> !,
 | 
						|
	brief_message(Action).
 | 
						|
message(restore(_, source(DB, Nth, Total))) -->
 | 
						|
	{ file_base_name(DB, Base) },
 | 
						|
	[ 'Restoring ~w ... (~D of ~D graphs) '-[Base, Nth, Total], flush ].
 | 
						|
message(restore(_, snapshot(_))) -->
 | 
						|
	[ at_same_line, '(snapshot) '-[], flush ].
 | 
						|
message(restore(_, journal(_))) -->
 | 
						|
	[ at_same_line, '(journal) '-[], flush ].
 | 
						|
message(restore(_, done(_, Time, Count, _Nth, _Total))) -->
 | 
						|
	[ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 | 
						|
message(update_failed(S,P,O,Action)) -->
 | 
						|
	[ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
 | 
						|
 | 
						|
silent_message(_Action) --> [].
 | 
						|
 | 
						|
brief_message(source(DB, Nth, Total)) -->
 | 
						|
	{ file_base_name(DB, Base) },
 | 
						|
	[ at_same_line,
 | 
						|
	  '\r~w~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
 | 
						|
	  flush
 | 
						|
	].
 | 
						|
brief_message(snapshot(_File)) --> [].
 | 
						|
brief_message(journal(_File))  --> [].
 | 
						|
brief_message(done(_DB, _Time, _Count, _Nth, _Total)) --> [].
 | 
						|
 | 
						|
prolog:message_context(rdf_locked(Args)) -->
 | 
						|
	{ memberchk(time(Time), Args),
 | 
						|
	  memberchk(pid(Pid), Args),
 | 
						|
	  format_time(string(S), '%+', Time)
 | 
						|
	},
 | 
						|
	[ nl,
 | 
						|
	  'locked at ~s by process id ~w'-[S,Pid]
 | 
						|
	].
 |