1109 lines
30 KiB
Perl
1109 lines
30 KiB
Perl
|
/* $Id$
|
||
|
|
||
|
Part of SWI-Prolog
|
||
|
|
||
|
Author: Jan Wielemaker
|
||
|
E-mail: wielemak@science.uva.nl
|
||
|
WWW: http://www.swi-prolog.org
|
||
|
Copyright (C): 1985-2007, University of Amsterdam
|
||
|
|
||
|
This program is free software; you can redistribute it and/or
|
||
|
modify it under the terms of the GNU General Public License
|
||
|
as published by the Free Software Foundation; either version 2
|
||
|
of the License, or (at your option) any later version.
|
||
|
|
||
|
This program is distributed in the hope that it will be useful,
|
||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
GNU General Public License for more details.
|
||
|
|
||
|
You should have received a copy of the GNU Lesser General Public
|
||
|
License along with this library; if not, write to the Free Software
|
||
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||
|
|
||
|
As a special exception, if you link this library with other files,
|
||
|
compiled with a Free Software compiler, to produce an executable, this
|
||
|
library does not by itself cause the resulting executable to be covered
|
||
|
by the GNU General Public License. This exception does not however
|
||
|
invalidate any other reasons why the executable file might be covered by
|
||
|
the GNU General Public License.
|
||
|
*/
|
||
|
|
||
|
:- module(rdf_persistency,
|
||
|
[ rdf_attach_db/2, % +Directory, +Options
|
||
|
rdf_detach_db/0, % +Detach current DB
|
||
|
rdf_current_db/1, % -Directory
|
||
|
rdf_persistency/2, % +DB, +Bool
|
||
|
rdf_flush_journals/1, % +Options
|
||
|
rdf_journal_file/2, % ?DB, ?JournalFile
|
||
|
rdf_db_to_file/2 % ?DB, ?FileBase
|
||
|
]).
|
||
|
:- use_module(library('semweb/rdf_db')).
|
||
|
:- use_module(library(lists)).
|
||
|
:- use_module(library(url)).
|
||
|
:- use_module(library(debug)).
|
||
|
:- use_module(library(error)).
|
||
|
:- use_module(library(thread)).
|
||
|
:- use_module(library(pairs)).
|
||
|
|
||
|
|
||
|
/** <module> RDF persistency plugin
|
||
|
|
||
|
This module provides persistency for rdf_db.pl based on the
|
||
|
rdf_monitor/2 predicate to track changes to the repository. Where
|
||
|
previous versions used autosafe of the whole database using the
|
||
|
quick-load format of rdf_db, this version is based on a quick-load file
|
||
|
per source (4th argument of rdf/4), and journalling for edit operations.
|
||
|
|
||
|
The result is safe, avoids frequent small changes to large files which
|
||
|
makes synchronisation and backup expensive and avoids long disruption of
|
||
|
the server doing the autosafe. Only loading large files disrupts service
|
||
|
for some time.
|
||
|
|
||
|
The persistent backup of the database is realised in a directory, using
|
||
|
a lock file to avoid corruption due to concurrent access. Each source is
|
||
|
represented by two files, the latest snapshot and a journal. The state
|
||
|
is restored by loading the snapshot and replaying the journal. The
|
||
|
predicate rdf_flush_journals/1 can be used to create fresh snapshots and
|
||
|
delete the journals.
|
||
|
|
||
|
@tbd if there is a complete `.new' snapshot and no journal, we should
|
||
|
move the .new to the plain snapshot name as a means of recovery.
|
||
|
|
||
|
@see rdf_edit.pl
|
||
|
*/
|
||
|
|
||
|
:- volatile
|
||
|
rdf_directory/1,
|
||
|
rdf_option/1,
|
||
|
source_journal_fd/2,
|
||
|
db_file_base/2,
|
||
|
file_base_db/2.
|
||
|
:- dynamic
|
||
|
rdf_directory/1, % Absolute path
|
||
|
rdf_option/1, % Defined options
|
||
|
source_journal_fd/2, % DB, JournalFD
|
||
|
db_file_base/2, % DB, FileBase
|
||
|
file_base_db/2. % FileBase, DB
|
||
|
|
||
|
:- meta_predicate
|
||
|
no_agc(0).
|
||
|
|
||
|
%% rdf_attach_db(+Directory, +Options)
|
||
|
%
|
||
|
% Start persistent operations using Directory as place to store
|
||
|
% files. There are several cases:
|
||
|
%
|
||
|
% * Empty DB, existing directory
|
||
|
% Load the DB from the existing directory
|
||
|
%
|
||
|
% * Full DB, empty directory
|
||
|
% Create snapshots for all sources in directory
|
||
|
%
|
||
|
% Options:
|
||
|
%
|
||
|
% * concurrency(+Jobs)
|
||
|
% Number of threads to use for loading the initial
|
||
|
% database. If not provided it is the number of CPUs
|
||
|
% as optained from the flag =cpu_count=.
|
||
|
%
|
||
|
% * max_open_journals(+Count)
|
||
|
% Maximum number of journals kept open. If not provided,
|
||
|
% the default is 10. See limit_fd_pool/0.
|
||
|
%
|
||
|
% * silent(+BoolOrBrief)
|
||
|
% If =true= (default =false=), do not print informational
|
||
|
% messages. Finally, if =brief= it will show minimal
|
||
|
% feedback.
|
||
|
%
|
||
|
% * log_nested_transactions(+Boolean)
|
||
|
% If =true=, nested _log_ transactions are added to the
|
||
|
% journal information. By default (=false=), no log-term
|
||
|
% is added for nested transactions.
|
||
|
|
||
|
rdf_attach_db(DirSpec, Options) :-
|
||
|
absolute_file_name(DirSpec,
|
||
|
Directory,
|
||
|
[ access(write),
|
||
|
file_type(directory),
|
||
|
file_errors(fail)
|
||
|
]), !,
|
||
|
( rdf_directory(Directory)
|
||
|
-> true % update settings?
|
||
|
; rdf_detach_db,
|
||
|
mkdir(Directory),
|
||
|
lock_db(Directory),
|
||
|
assert(rdf_directory(Directory)),
|
||
|
assert_options(Options),
|
||
|
stop_monitor, % make sure not to register load
|
||
|
no_agc(load_db),
|
||
|
at_halt(rdf_detach_db),
|
||
|
start_monitor
|
||
|
).
|
||
|
rdf_attach_db(DirSpec, Options) :-
|
||
|
absolute_file_name(DirSpec,
|
||
|
Directory,
|
||
|
[ solutions(all)
|
||
|
]),
|
||
|
( exists_directory(Directory)
|
||
|
-> access_file(Directory, write)
|
||
|
; catch(make_directory(Directory), _, fail)
|
||
|
), !,
|
||
|
rdf_attach_db(Directory, Options).
|
||
|
|
||
|
|
||
|
assert_options([]).
|
||
|
assert_options([H|T]) :-
|
||
|
( option_type(H, Check)
|
||
|
-> Check,
|
||
|
assert(rdf_option(H))
|
||
|
; true % ignore options we do not understand
|
||
|
),
|
||
|
assert_options(T).
|
||
|
|
||
|
option_type(concurrency(X), must_be(positive_integer, X)).
|
||
|
option_type(max_open_journals(X), must_be(positive_integer, X)).
|
||
|
option_type(silent(X), must_be(oneof([true,false,brief]), X)).
|
||
|
option_type(log_nested_transactions(X), must_be(boolean, X)).
|
||
|
|
||
|
|
||
|
%% no_agc(:Goal)
|
||
|
%
|
||
|
% Run Goal with atom garbage collection disabled. Loading an RDF
|
||
|
% database creates large amounts of atoms we *know* are not
|
||
|
% garbage.
|
||
|
|
||
|
no_agc(Goal) :-
|
||
|
current_prolog_flag(agc_margin, Old),
|
||
|
set_prolog_flag(agc_margin, 0),
|
||
|
call_cleanup(Goal, set_prolog_flag(agc_margin, Old)).
|
||
|
|
||
|
|
||
|
%% rdf_detach_db is det.
|
||
|
%
|
||
|
% Detach from the current database. Succeeds silently if no
|
||
|
% database is attached. Normally called at the end of the program
|
||
|
% through at_halt/1.
|
||
|
|
||
|
rdf_detach_db :-
|
||
|
debug(halt, 'Detaching database', []),
|
||
|
stop_monitor,
|
||
|
close_journals,
|
||
|
( retract(rdf_directory(Dir))
|
||
|
-> debug(halt, 'DB Directory: ~w', [Dir]),
|
||
|
retractall(rdf_option(_)),
|
||
|
retractall(source_journal_fd(_,_)),
|
||
|
retractall(db_file_base(_,_)),
|
||
|
unlock_db(Dir)
|
||
|
; true
|
||
|
).
|
||
|
|
||
|
|
||
|
%% rdf_current_db(?Dir)
|
||
|
%
|
||
|
% True if Dir is the current RDF persistent database.
|
||
|
|
||
|
rdf_current_db(Directory) :-
|
||
|
rdf_directory(Dir), !,
|
||
|
Dir = Directory.
|
||
|
|
||
|
|
||
|
%% rdf_flush_journals(+Options)
|
||
|
%
|
||
|
% Flush dirty journals. Options:
|
||
|
%
|
||
|
% * min_size(+KB)
|
||
|
% Only flush if journal is over KB in size.
|
||
|
% TBD: sensible default size
|
||
|
|
||
|
rdf_flush_journals(Options) :-
|
||
|
forall(rdf_source(DB),
|
||
|
rdf_flush_journal(DB, Options)).
|
||
|
|
||
|
rdf_flush_journal(DB, Options) :-
|
||
|
db_files(DB, _SnapshotFile, JournalFile),
|
||
|
db_file(JournalFile, File),
|
||
|
( \+ exists_file(File)
|
||
|
-> true
|
||
|
; memberchk(min_size(KB), Options),
|
||
|
size_file(JournalFile, Size),
|
||
|
Size / 1024 < KB
|
||
|
-> true
|
||
|
; create_db(DB)
|
||
|
).
|
||
|
|
||
|
/*******************************
|
||
|
* LOAD *
|
||
|
*******************************/
|
||
|
|
||
|
%% load_db is det.
|
||
|
%
|
||
|
% Reload database from the directory specified by rdf_directory/1.
|
||
|
% First we find all names graphs using find_dbs/1 and then we load
|
||
|
% them.
|
||
|
|
||
|
load_db :-
|
||
|
rdf_directory(Dir),
|
||
|
working_directory(Old, Dir),
|
||
|
get_time(Wall0),
|
||
|
statistics(cputime, T0),
|
||
|
call_cleanup(find_dbs(DBs), working_directory(_, Old)),
|
||
|
length(DBs, DBCount),
|
||
|
verbosity(DBCount, Silent),
|
||
|
make_goals(DBs, Silent, 1, DBCount, Goals),
|
||
|
concurrency(Jobs),
|
||
|
concurrent(Jobs, Goals, []),
|
||
|
statistics(cputime, T1),
|
||
|
get_time(Wall1),
|
||
|
T is T1 - T0,
|
||
|
Wall is Wall1 - Wall0,
|
||
|
message_level(Silent, Level),
|
||
|
print_message(Level, rdf(restore(attached(DBCount, T/Wall)))).
|
||
|
|
||
|
|
||
|
make_goals([], _, _, _, []).
|
||
|
make_goals([DB|T0], Silent, I, Total,
|
||
|
[load_source(DB, Silent, I, Total)|T]) :-
|
||
|
I2 is I + 1,
|
||
|
make_goals(T0, Silent, I2, Total, T).
|
||
|
|
||
|
verbosity(_DBCount, Silent) :-
|
||
|
rdf_option(silent(Silent)), !.
|
||
|
verbosity(DBCount, Silent) :-
|
||
|
DBCount > 25, !,
|
||
|
Silent = brief.
|
||
|
verbosity(_DBCount, false).
|
||
|
|
||
|
|
||
|
%% concurrency(-Jobs)
|
||
|
%
|
||
|
% Number of jobs to run concurrently.
|
||
|
|
||
|
concurrency(Jobs) :-
|
||
|
rdf_option(concurrency(Jobs)), !.
|
||
|
concurrency(Jobs) :-
|
||
|
current_prolog_flag(cpu_count, Jobs),
|
||
|
Jobs > 0, !.
|
||
|
concurrency(1).
|
||
|
|
||
|
|
||
|
%% find_dbs(-DBs:list(atom)) is det.
|
||
|
%
|
||
|
% DBs is a list of database (named graph) names, sorted in
|
||
|
% increasing file-size. Small files are loaded first as these
|
||
|
% typically contain the schemas and we want to avoid re-hashing
|
||
|
% large databases due to added rdfs:subPropertyOf triples.
|
||
|
|
||
|
find_dbs(DBs) :-
|
||
|
expand_file_name(*, Files),
|
||
|
phrase(scan_db_files(Files), Scanned),
|
||
|
sort(Scanned, ByDB),
|
||
|
join_snapshot_and_journals(ByDB, BySize),
|
||
|
keysort(BySize, SortedBySize),
|
||
|
pairs_values(SortedBySize, DBs).
|
||
|
|
||
|
|
||
|
%% scan_db_files(+Files)// is det.
|
||
|
%
|
||
|
% Produces a list of db(DB, Size, File) for all recognised RDF
|
||
|
% database files.
|
||
|
|
||
|
scan_db_files([]) -->
|
||
|
[].
|
||
|
scan_db_files([File|T]) -->
|
||
|
{ file_name_extension(Base, Ext, File),
|
||
|
db_extension(Ext), !,
|
||
|
rdf_db_to_file(DB, Base),
|
||
|
size_file(File, Size)
|
||
|
},
|
||
|
[ db(DB, Size, File) ],
|
||
|
scan_db_files(T).
|
||
|
scan_db_files([_|T]) -->
|
||
|
scan_db_files(T).
|
||
|
|
||
|
|
||
|
db_extension(trp).
|
||
|
db_extension(jrn).
|
||
|
|
||
|
join_snapshot_and_journals([], []).
|
||
|
join_snapshot_and_journals([db(DB,S0,_)|T0], [S-DB|T]) :- !,
|
||
|
same_db(DB, T0, T1, S0, S),
|
||
|
join_snapshot_and_journals(T1, T).
|
||
|
|
||
|
same_db(DB, [db(DB,S1,_)|T0], T, S0, S) :- !,
|
||
|
S2 is S0+S1,
|
||
|
same_db(DB, T0, T, S2, S).
|
||
|
same_db(_, L, L, S, S).
|
||
|
|
||
|
|
||
|
%% load_source(+DB, +Silent, +Nth, +Total) is det.
|
||
|
%
|
||
|
% Load triples and reload journal from the indicated snapshot
|
||
|
% file.
|
||
|
%
|
||
|
% @param Silent One of =false=, =true= or =brief=
|
||
|
|
||
|
load_source(DB, Silent, Nth, Total) :-
|
||
|
message_level(Silent, Level),
|
||
|
db_files(DB, SnapshotFile, JournalFile),
|
||
|
rdf_retractall(_,_,_,DB),
|
||
|
statistics(cputime, T0),
|
||
|
print_message(Level, rdf(restore(Silent, source(DB, Nth, Total)))),
|
||
|
db_file(SnapshotFile, AbsSnapShot),
|
||
|
( exists_file(AbsSnapShot)
|
||
|
-> print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
|
||
|
rdf_load_db(AbsSnapShot)
|
||
|
; true
|
||
|
),
|
||
|
( exists_db(JournalFile)
|
||
|
-> print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
|
||
|
load_journal(JournalFile, DB)
|
||
|
; true
|
||
|
),
|
||
|
statistics(cputime, T1),
|
||
|
T is T1 - T0,
|
||
|
( rdf_statistics(triples_by_file(DB, Count))
|
||
|
-> true
|
||
|
; Count = 0
|
||
|
),
|
||
|
print_message(Level, rdf(restore(Silent,
|
||
|
done(DB, T, Count, Nth, Total)))).
|
||
|
|
||
|
message_level(true, silent) :- !.
|
||
|
message_level(_, informational).
|
||
|
|
||
|
|
||
|
/*******************************
|
||
|
* LOAD JOURNAL *
|
||
|
*******************************/
|
||
|
|
||
|
%% load_journal(+File:atom, +DB:atom) is det.
|
||
|
%
|
||
|
% Process transactions from the RDF journal File, adding the given
|
||
|
% named graph.
|
||
|
|
||
|
load_journal(File, DB) :-
|
||
|
open_db(File, read, In, []),
|
||
|
call_cleanup(( read(In, T0),
|
||
|
process_journal(T0, In, DB)
|
||
|
),
|
||
|
close(In)).
|
||
|
|
||
|
process_journal(end_of_file, _, _) :- !.
|
||
|
process_journal(Term, In, DB) :-
|
||
|
( process_journal_term(Term, DB)
|
||
|
-> true
|
||
|
; throw(error(type_error(journal_term, Term), _))
|
||
|
),
|
||
|
read(In, T2),
|
||
|
process_journal(T2, In, DB).
|
||
|
|
||
|
process_journal_term(assert(S,P,O), DB) :-
|
||
|
rdf_assert(S,P,O,DB).
|
||
|
process_journal_term(assert(S,P,O,Line), DB) :-
|
||
|
rdf_assert(S,P,O,DB:Line).
|
||
|
process_journal_term(retract(S,P,O), DB) :-
|
||
|
rdf_retractall(S,P,O,DB).
|
||
|
process_journal_term(retract(S,P,O,Line), DB) :-
|
||
|
rdf_retractall(S,P,O,DB:Line).
|
||
|
process_journal_term(update(S,P,O,Action), DB) :-
|
||
|
( rdf_update(S,P,O,DB, Action)
|
||
|
-> true
|
||
|
; print_message(warning, rdf(update_failed(S,P,O,Action)))
|
||
|
).
|
||
|
process_journal_term(start(_), _). % journal open/close
|
||
|
process_journal_term(end(_), _).
|
||
|
process_journal_term(begin(_), _). % logged transaction (compatibility)
|
||
|
process_journal_term(end, _).
|
||
|
process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
|
||
|
process_journal_term(end(_,_,_), _).
|
||
|
|
||
|
|
||
|
/*******************************
|
||
|
* CREATE JOURNAL *
|
||
|
*******************************/
|
||
|
|
||
|
:- dynamic
|
||
|
blocked_db/2, % DB, Reason
|
||
|
transaction_message/3, % Nesting, Time, Message
|
||
|
transaction_db/3. % Nesting, DB, Id
|
||
|
|
||
|
%% rdf_persistency(+DB, Bool)
|
||
|
%
|
||
|
% Specify whether a database is persistent. Switching to =false=
|
||
|
% kills the persistent state. Switching to =true= creates it.
|
||
|
|
||
|
rdf_persistency(DB, Bool) :-
|
||
|
must_be(atom, DB),
|
||
|
must_be(boolean, Bool),
|
||
|
fail.
|
||
|
rdf_persistency(DB, false) :- !,
|
||
|
( blocked_db(DB, persistency)
|
||
|
-> true
|
||
|
; assert(blocked_db(DB, persistency)),
|
||
|
delete_db(DB)
|
||
|
).
|
||
|
rdf_persistency(DB, true) :-
|
||
|
( retract(blocked_db(DB, persistency))
|
||
|
-> create_db(DB)
|
||
|
; true
|
||
|
).
|
||
|
|
||
|
|
||
|
%% start_monitor is det.
|
||
|
%% stop_monitor is det.
|
||
|
%
|
||
|
% Start/stop monitoring the RDF database for changes and update
|
||
|
% the journal.
|
||
|
|
||
|
start_monitor :-
|
||
|
rdf_monitor(monitor,
|
||
|
[ -assert(load)
|
||
|
]).
|
||
|
stop_monitor :-
|
||
|
rdf_monitor(monitor,
|
||
|
[ -all
|
||
|
]).
|
||
|
|
||
|
%% monitor(+Term) is semidet.
|
||
|
%
|
||
|
% Handle an rdf_monitor/2 callback to deal with persistency. Note
|
||
|
% that the monitor calls that come from rdf_db.pl that deal with
|
||
|
% database changes are serialized. They do come from different
|
||
|
% threads though.
|
||
|
|
||
|
monitor(Msg) :-
|
||
|
debug(monitor, 'Monitor: ~p~n', [Msg]),
|
||
|
fail.
|
||
|
monitor(assert(S,P,O,DB:Line)) :- !,
|
||
|
\+ blocked_db(DB, _),
|
||
|
journal_fd(DB, Fd),
|
||
|
open_transaction(DB, Fd),
|
||
|
format(Fd, '~q.~n', [assert(S,P,O,Line)]),
|
||
|
sync_journal(DB, Fd).
|
||
|
monitor(assert(S,P,O,DB)) :-
|
||
|
\+ blocked_db(DB, _),
|
||
|
journal_fd(DB, Fd),
|
||
|
open_transaction(DB, Fd),
|
||
|
format(Fd, '~q.~n', [assert(S,P,O)]),
|
||
|
sync_journal(DB, Fd).
|
||
|
monitor(retract(S,P,O,DB:Line)) :- !,
|
||
|
\+ blocked_db(DB, _),
|
||
|
journal_fd(DB, Fd),
|
||
|
open_transaction(DB, Fd),
|
||
|
format(Fd, '~q.~n', [retract(S,P,O,Line)]),
|
||
|
sync_journal(DB, Fd).
|
||
|
monitor(retract(S,P,O,DB)) :-
|
||
|
\+ blocked_db(DB, _),
|
||
|
journal_fd(DB, Fd),
|
||
|
open_transaction(DB, Fd),
|
||
|
format(Fd, '~q.~n', [retract(S,P,O)]),
|
||
|
sync_journal(DB, Fd).
|
||
|
monitor(update(S,P,O,DB:Line,Action)) :- !,
|
||
|
\+ blocked_db(DB, _),
|
||
|
( Action = source(NewDB)
|
||
|
-> monitor(assert(S,P,O,NewDB)),
|
||
|
monitor(retract(S,P,O,DB:Line))
|
||
|
; journal_fd(DB, Fd),
|
||
|
format(Fd, '~q.~n', [update(S,P,O,Action)]),
|
||
|
sync_journal(DB, Fd)
|
||
|
).
|
||
|
monitor(update(S,P,O,DB,Action)) :-
|
||
|
\+ blocked_db(DB, _),
|
||
|
( Action = source(NewDB)
|
||
|
-> monitor(assert(S,P,O,NewDB)),
|
||
|
monitor(retract(S,P,O,DB))
|
||
|
; journal_fd(DB, Fd),
|
||
|
open_transaction(DB, Fd),
|
||
|
format(Fd, '~q.~n', [update(S,P,O,Action)]),
|
||
|
sync_journal(DB, Fd)
|
||
|
).
|
||
|
monitor(load(BE, Id)) :-
|
||
|
( BE == begin
|
||
|
-> push_state(Id)
|
||
|
; sync_state(Id)
|
||
|
).
|
||
|
monitor(transaction(BE, Id)) :-
|
||
|
monitor_transaction(Id, BE).
|
||
|
|
||
|
monitor_transaction(load_journal(DB), begin(_)) :- !,
|
||
|
assert(blocked_db(DB, journal)).
|
||
|
monitor_transaction(load_journal(DB), end(_)) :- !,
|
||
|
retractall(blocked_db(DB, journal)).
|
||
|
|
||
|
monitor_transaction(parse(URI), begin(_)) :- !,
|
||
|
( blocked_db(URI, persistency)
|
||
|
-> true
|
||
|
; assert(blocked_db(URI, parse))
|
||
|
).
|
||
|
monitor_transaction(parse(URI), end(_)) :- !,
|
||
|
( retract(blocked_db(URI, parse))
|
||
|
-> create_db(URI)
|
||
|
; true
|
||
|
).
|
||
|
monitor_transaction(unload(DB), begin(_)) :- !,
|
||
|
( blocked_db(DB, persistency)
|
||
|
-> true
|
||
|
; assert(blocked_db(DB, unload))
|
||
|
).
|
||
|
monitor_transaction(unload(DB), end(_)) :- !,
|
||
|
( retract(blocked_db(DB, unload))
|
||
|
-> delete_db(DB)
|
||
|
; true
|
||
|
).
|
||
|
monitor_transaction(log(Msg), begin(N)) :- !,
|
||
|
check_nested(N),
|
||
|
get_time(Time),
|
||
|
asserta(transaction_message(N, Time, Msg)).
|
||
|
monitor_transaction(log(_), end(N)) :-
|
||
|
check_nested(N),
|
||
|
retract(transaction_message(N, _, _)), !,
|
||
|
findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
|
||
|
end_transactions(DBs, N).
|
||
|
monitor_transaction(log(Msg, DB), begin(N)) :- !,
|
||
|
check_nested(N),
|
||
|
get_time(Time),
|
||
|
asserta(transaction_message(N, Time, Msg)),
|
||
|
journal_fd(DB, Fd),
|
||
|
open_transaction(DB, Fd).
|
||
|
monitor_transaction(log(Msg, _DB), end(N)) :-
|
||
|
monitor_transaction(log(Msg), end(N)).
|
||
|
monitor_transaction(reset, begin(L)) :-
|
||
|
forall(rdf_graph(DB),
|
||
|
monitor_transaction(unload(DB), begin(L))).
|
||
|
monitor_transaction(reset, end(L)) :-
|
||
|
forall(blocked_db(DB, unload),
|
||
|
monitor_transaction(unload(DB), end(L))),
|
||
|
retractall(current_transaction_id(_,_)).
|
||
|
|
||
|
|
||
|
%% check_nested(+Level) is semidet.
|
||
|
%
|
||
|
% True if we must log this transaction. This is always the case
|
||
|
% for toplevel transactions. Nested transactions are only logged
|
||
|
% if log_nested_transactions(true) is defined.
|
||
|
|
||
|
check_nested(0) :- !.
|
||
|
check_nested(_) :-
|
||
|
rdf_option(log_nested_transactions(true)).
|
||
|
|
||
|
|
||
|
%% open_transaction(+DB, +Fd) is det.
|
||
|
%
|
||
|
% Add a begin(Id, Level, Time, Message) term if a transaction
|
||
|
% involves DB. Id is an incremental integer, where each database
|
||
|
% has its own counter. Level is the nesting level, Time a floating
|
||
|
% point timestamp and Message te message provided as argument to
|
||
|
% the log message.
|
||
|
|
||
|
open_transaction(DB, Fd) :-
|
||
|
transaction_message(N, Time, Msg), !,
|
||
|
( transaction_db(N, DB, _)
|
||
|
-> true
|
||
|
; next_transaction_id(DB, Id),
|
||
|
assert(transaction_db(N, DB, Id)),
|
||
|
format(Fd, 'begin(~q, ~q, ~2f, ~q).~n', [Id, N, Time, Msg])
|
||
|
).
|
||
|
open_transaction(_,_).
|
||
|
|
||
|
|
||
|
%% next_transaction_id(+DB, -Id) is det.
|
||
|
%
|
||
|
% Id is the number to user for the next logged transaction on DB.
|
||
|
% Transactions in each named graph are numbered in sequence.
|
||
|
% Searching the Id of the last transaction is performed by the 2nd
|
||
|
% clause starting 1Kb from the end and doubling this offset each
|
||
|
% failure.
|
||
|
|
||
|
:- dynamic
|
||
|
current_transaction_id/2.
|
||
|
|
||
|
next_transaction_id(DB, Id) :-
|
||
|
retract(current_transaction_id(DB, Last)), !,
|
||
|
Id is Last + 1,
|
||
|
assert(current_transaction_id(DB, Id)).
|
||
|
next_transaction_id(DB, Id) :-
|
||
|
db_files(DB, _, Journal),
|
||
|
exists_file(Journal), !,
|
||
|
size_file(Journal, Size),
|
||
|
open_db(Journal, read, In, []),
|
||
|
call_cleanup(iterative_expand(In, Size, Last), close(In)),
|
||
|
Id is Last + 1,
|
||
|
assert(current_transaction_id(DB, Id)).
|
||
|
next_transaction_id(DB, 1) :-
|
||
|
assert(current_transaction_id(DB, 1)).
|
||
|
|
||
|
iterative_expand(_, 0, 0) :- !.
|
||
|
iterative_expand(In, Size, Last) :- % Scan growing sections from the end
|
||
|
Max is floor(log(Size)/log(2)),
|
||
|
between(10, Max, Step),
|
||
|
Offset is -(1<<Step),
|
||
|
seek(In, Offset, eof, _),
|
||
|
skip(In, 10), % records are line-based
|
||
|
read(In, T0),
|
||
|
last_transaction_id(T0, In, 0, Last),
|
||
|
Last > 0, !.
|
||
|
iterative_expand(In, _, Last) :- % Scan the whole file
|
||
|
seek(In, 0, bof, _),
|
||
|
read(In, T0),
|
||
|
last_transaction_id(T0, In, 0, Last).
|
||
|
|
||
|
last_transaction_id(end_of_file, _, Last, Last) :- !.
|
||
|
last_transaction_id(end(Id, _, _), In, _, Last) :-
|
||
|
read(In, T1),
|
||
|
last_transaction_id(T1, In, Id, Last).
|
||
|
last_transaction_id(_, In, Id, Last) :-
|
||
|
read(In, T1),
|
||
|
last_transaction_id(T1, In, Id, Last).
|
||
|
|
||
|
|
||
|
%% end_transactions(+DBs:list(atom:id)) is det.
|
||
|
%
|
||
|
% End a transaction that affected the given list of databases. We
|
||
|
% write the list of other affected databases as an argument to the
|
||
|
% end-term to facilitate fast finding of the related transactions.
|
||
|
%
|
||
|
% In each database, the transaction is ended with a term end(Id,
|
||
|
% Nesting, Others), where Id and Nesting are the transaction
|
||
|
% identifier and nesting (see open_transaction/2) and Others is a
|
||
|
% list of DB:Id, indicating other databases affected by the
|
||
|
% transaction.
|
||
|
|
||
|
end_transactions(DBs, N) :-
|
||
|
end_transactions(DBs, DBs, N).
|
||
|
|
||
|
end_transactions([], _, _).
|
||
|
end_transactions([DB:Id|T], DBs, N) :-
|
||
|
journal_fd(DB, Fd),
|
||
|
once(select(DB:Id, DBs, Others)),
|
||
|
format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
|
||
|
sync_journal(DB, Fd),
|
||
|
end_transactions(T, DBs, N).
|
||
|
|
||
|
|
||
|
% State handling. We use this for trapping changes by
|
||
|
% rdf_load_db/1. In theory, loading such files can add triples to
|
||
|
% multiple sources. In practice this rarely happens. We save the
|
||
|
% current state and sync all files that have changed. The only
|
||
|
% drawback of this approach is that loaded files spreading triples
|
||
|
% over multiple databases cause all these databases to be fully
|
||
|
% synchronised. This shouldn't happen very often.
|
||
|
|
||
|
:- dynamic
|
||
|
pre_load_state/2.
|
||
|
|
||
|
push_state(Id) :-
|
||
|
get_state(State),
|
||
|
asserta(pre_load_state(Id, State)).
|
||
|
|
||
|
get_state(State) :-
|
||
|
findall(DB-MD5, (rdf_graph(DB), rdf_md5(DB, MD5)), State0),
|
||
|
keysort(State0, State).
|
||
|
|
||
|
sync_state(Id) :-
|
||
|
retract(pre_load_state(Id, PreState)),
|
||
|
get_state(AfterState),
|
||
|
sync_state(AfterState, PreState).
|
||
|
|
||
|
sync_state([], _).
|
||
|
sync_state([DB-MD5|TA], Pre) :-
|
||
|
( memberchk(DB-MD5P, Pre),
|
||
|
MD5P == MD5
|
||
|
-> true
|
||
|
; create_db(DB)
|
||
|
),
|
||
|
sync_state(TA, Pre).
|
||
|
|
||
|
|
||
|
/*******************************
|
||
|
* JOURNAL FILES *
|
||
|
*******************************/
|
||
|
|
||
|
%% journal_fd(+DB, -Stream) is det.
|
||
|
%
|
||
|
% Get an open stream to a journal. If the journal is not open, old
|
||
|
% journals are closed to satisfy the =max_open_journals= option.
|
||
|
% Then the journal is opened in =append= mode. Journal files are
|
||
|
% always encoded as UTF-8 for portability as well as to ensure
|
||
|
% full coverage of Unicode.
|
||
|
|
||
|
journal_fd(DB, Fd) :-
|
||
|
source_journal_fd(DB, Fd), !.
|
||
|
journal_fd(DB, Fd) :-
|
||
|
with_mutex(rdf_journal_file,
|
||
|
journal_fd_(DB, Out)),
|
||
|
Fd = Out.
|
||
|
|
||
|
journal_fd_(DB, Fd) :-
|
||
|
source_journal_fd(DB, Fd), !.
|
||
|
journal_fd_(DB, Fd) :-
|
||
|
limit_fd_pool,
|
||
|
db_files(DB, _Snapshot, Journal),
|
||
|
open_db(Journal, append, Fd,
|
||
|
[ close_on_abort(false)
|
||
|
]),
|
||
|
time_stamp(Now),
|
||
|
format(Fd, '~q.~n', [start([time(Now)])]),
|
||
|
assert(source_journal_fd(DB, Fd)). % new one at the end
|
||
|
|
||
|
%% limit_fd_pool is det.
|
||
|
%
|
||
|
% Limit the number of open journals to max_open_journals (10).
|
||
|
% Note that calls from rdf_monitor/2 are issued in different
|
||
|
% threads, but as they are part of write operations they are fully
|
||
|
% synchronised.
|
||
|
|
||
|
limit_fd_pool :-
|
||
|
predicate_property(source_journal_fd(_, _), number_of_clauses(N)), !,
|
||
|
( rdf_option(max_open_journals(Max))
|
||
|
-> true
|
||
|
; Max = 10
|
||
|
),
|
||
|
Close is N - Max,
|
||
|
forall(between(1, Close, _),
|
||
|
close_oldest_journal).
|
||
|
limit_fd_pool.
|
||
|
|
||
|
close_oldest_journal :-
|
||
|
source_journal_fd(DB, _Fd), !,
|
||
|
debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
|
||
|
close_journal(DB).
|
||
|
close_oldest_journal.
|
||
|
|
||
|
|
||
|
%% sync_journal(+DB, +Fd)
|
||
|
%
|
||
|
% Sync journal represented by database and stream. If the DB is
|
||
|
% involved in a transaction there is no point flushing until the
|
||
|
% end of the transaction.
|
||
|
|
||
|
sync_journal(DB, _) :-
|
||
|
transaction_db(_, DB, _), !.
|
||
|
sync_journal(_, Fd) :-
|
||
|
flush_output(Fd).
|
||
|
|
||
|
%% close_journal(+DB) is det.
|
||
|
%
|
||
|
% Close the journal associated with DB if it is open.
|
||
|
|
||
|
close_journal(DB) :-
|
||
|
with_mutex(rdf_journal_file,
|
||
|
close_journal_(DB)).
|
||
|
|
||
|
close_journal_(DB) :-
|
||
|
( retract(source_journal_fd(DB, Fd))
|
||
|
-> time_stamp(Now),
|
||
|
format(Fd, '~q.~n', [end([time(Now)])]),
|
||
|
close(Fd, [force(true)])
|
||
|
; true
|
||
|
).
|
||
|
|
||
|
% close_journals
|
||
|
%
|
||
|
% Close all open journals.
|
||
|
|
||
|
close_journals :-
|
||
|
forall(source_journal_fd(DB, _),
|
||
|
catch(close_journal(DB), E,
|
||
|
print_message(error, E))).
|
||
|
|
||
|
%% create_db(+DB)
|
||
|
%
|
||
|
% Create a saved version of DB in corresponding file, close and
|
||
|
% delete journals.
|
||
|
|
||
|
create_db(DB) :-
|
||
|
debug(rdf_persistency, 'Saving DB ~w', [DB]),
|
||
|
db_abs_files(DB, Snapshot, Journal),
|
||
|
atom_concat(Snapshot, '.new', NewSnapshot),
|
||
|
( catch(rdf_save_db(NewSnapshot, DB), _, fail)
|
||
|
-> ( exists_file(Journal)
|
||
|
-> delete_file(Journal)
|
||
|
; true
|
||
|
),
|
||
|
rename_file(NewSnapshot, Snapshot),
|
||
|
debug(rdf_persistency, 'Saved DB ~w', [DB])
|
||
|
; catch(delete_file(NewSnapshot), _, true)
|
||
|
).
|
||
|
|
||
|
|
||
|
%% delete_db(+DB)
|
||
|
%
|
||
|
% Remove snapshot and journal file for DB.
|
||
|
|
||
|
delete_db(DB) :-
|
||
|
db_abs_files(DB, Snapshot, Journal),
|
||
|
( exists_file(Journal)
|
||
|
-> delete_file(Journal)
|
||
|
; true
|
||
|
),
|
||
|
( exists_file(Snapshot)
|
||
|
-> delete_file(Snapshot)
|
||
|
; true
|
||
|
).
|
||
|
|
||
|
|
||
|
/*******************************
|
||
|
* LOCKING *
|
||
|
*******************************/
|
||
|
|
||
|
%% lock_db(+Dir)
|
||
|
%
|
||
|
% Lock the database directory. This isn't safe as the file
|
||
|
% operations are not atomic. Needs re-thinking, but with the
|
||
|
% normal server setting it should be ok.
|
||
|
|
||
|
lock_db(Dir) :-
|
||
|
lockfile(Dir, File),
|
||
|
exists_file(File), !,
|
||
|
( catch(read_file_to_terms(File, Terms, []), _, fail),
|
||
|
Terms = [locked(Args)]
|
||
|
-> Context = rdf_locked(Args)
|
||
|
; Context = context(_, 'Database is in use')
|
||
|
),
|
||
|
throw(error(permission_error(lock, rdf_db, Dir), Context)).
|
||
|
lock_db(Dir) :-
|
||
|
lockfile(Dir, File),
|
||
|
open(File, write, Out),
|
||
|
( current_prolog_flag(pid, PID)
|
||
|
-> true
|
||
|
; PID = 0 % TBD: Fix in Prolog
|
||
|
),
|
||
|
time_stamp(Now),
|
||
|
format(Out, '/* RDF Database is in use */~n~n', []),
|
||
|
format(Out, '~q.~n', [ locked([ time(Now),
|
||
|
pid(PID)
|
||
|
])
|
||
|
]),
|
||
|
close(Out),
|
||
|
at_halt(unlock_db(Dir)).
|
||
|
|
||
|
unlock_db(Dir) :-
|
||
|
lockfile(Dir, File),
|
||
|
( exists_file(File)
|
||
|
-> delete_file(File)
|
||
|
; true
|
||
|
).
|
||
|
|
||
|
/*******************************
|
||
|
* FILENAMES *
|
||
|
*******************************/
|
||
|
|
||
|
lockfile(Dir, LockFile) :-
|
||
|
atomic_list_concat([Dir, /, lock], LockFile).
|
||
|
|
||
|
db_file(Base, File) :-
|
||
|
rdf_directory(Dir),
|
||
|
atomic_list_concat([Dir, /, Base], File).
|
||
|
|
||
|
open_db(Base, Mode, Stream, Options) :-
|
||
|
db_file(Base, File),
|
||
|
open(File, Mode, Stream, [encoding(utf8)|Options]).
|
||
|
|
||
|
exists_db(Base) :-
|
||
|
db_file(Base, File),
|
||
|
exists_file(File).
|
||
|
|
||
|
%% db_files(+DB, -Snapshot, -Journal).
|
||
|
%% db_files(-DB, +Snapshot, -Journal).
|
||
|
%% db_files(-DB, -Snapshot, +Journal).
|
||
|
%
|
||
|
% True if named graph DB is represented by the files Snapshot and
|
||
|
% Journal. The filenames are local to the directory representing
|
||
|
% the store.
|
||
|
|
||
|
db_files(DB, Snapshot, Journal) :-
|
||
|
nonvar(DB), !,
|
||
|
rdf_db_to_file(DB, Base),
|
||
|
atom_concat(Base, '.trp', Snapshot),
|
||
|
atom_concat(Base, '.jrn', Journal).
|
||
|
db_files(DB, Snapshot, Journal) :-
|
||
|
nonvar(Snapshot), !,
|
||
|
atom_concat(Base, '.trp', Snapshot),
|
||
|
atom_concat(Base, '.jrn', Journal),
|
||
|
rdf_db_to_file(DB, Base).
|
||
|
db_files(DB, Snapshot, Journal) :-
|
||
|
nonvar(Journal), !,
|
||
|
atom_concat(Base, '.jrn', Journal),
|
||
|
atom_concat(Base, '.trp', Snapshot),
|
||
|
rdf_db_to_file(DB, Base).
|
||
|
|
||
|
db_abs_files(DB, Snapshot, Journal) :-
|
||
|
db_files(DB, Snapshot0, Journal0),
|
||
|
db_file(Snapshot0, Snapshot),
|
||
|
db_file(Journal0, Journal).
|
||
|
|
||
|
|
||
|
%% rdf_journal_file(+DB, -File) is semidet.
|
||
|
%% rdf_journal_file(-DB, -File) is nondet.
|
||
|
%
|
||
|
% True if File is the absolute file name of an existing named
|
||
|
% graph DB.
|
||
|
%
|
||
|
% @tbd Avoid using private rdf_db:rdf_graphs_/1.
|
||
|
|
||
|
rdf_journal_file(DB, Journal) :-
|
||
|
( var(DB)
|
||
|
-> rdf_db:rdf_graphs_(All), % also pick the empty graphs
|
||
|
member(DB, All)
|
||
|
; true
|
||
|
),
|
||
|
db_abs_files(DB, _Snapshot, Journal),
|
||
|
exists_file(Journal).
|
||
|
|
||
|
|
||
|
%% rdf_db_to_file(+DB, -File) is det.
|
||
|
%% rdf_db_to_file(-DB, +File) is det.
|
||
|
%
|
||
|
% Translate between database encoding (often an file or URL) and
|
||
|
% the name we store in the directory. We keep a cache for two
|
||
|
% reasons. Speed, but much more important is that the mapping of
|
||
|
% raw --> encoded provided by www_form_encode/2 is not guaranteed
|
||
|
% to be unique by the W3C standards.
|
||
|
%
|
||
|
% @tbd We keep two predicates for exploiting Prolog indexing.
|
||
|
% Once multi-argument indexed is hashed we should clean
|
||
|
% this up.
|
||
|
|
||
|
rdf_db_to_file(DB, File) :-
|
||
|
nonvar(File),
|
||
|
file_base_db(File, DB), !.
|
||
|
rdf_db_to_file(DB, File) :-
|
||
|
nonvar(DB),
|
||
|
db_file_base(DB, File), !.
|
||
|
rdf_db_to_file(DB, File) :-
|
||
|
url_to_filename(DB, File),
|
||
|
assert(db_file_base(DB, File)),
|
||
|
assert(file_base_db(File, DB)).
|
||
|
|
||
|
%% url_to_filename(+URL, -FileName) is det.
|
||
|
%% url_to_filename(-URL, +FileName) is det.
|
||
|
%
|
||
|
% Turn a valid URL into a filename. Earlier versions used
|
||
|
% www_form_encode/2, but this can produce characters that are not
|
||
|
% valid in filenames. We will use the same encoding as
|
||
|
% www_form_encode/2, but using our own rules for allowed
|
||
|
% characters. The only requirement is that we avoid any filename
|
||
|
% special character in use. The current encoding use US-ASCII
|
||
|
% alnum characters, _ and %
|
||
|
|
||
|
url_to_filename(URL, FileName) :-
|
||
|
atomic(URL), !,
|
||
|
atom_codes(URL, Codes),
|
||
|
phrase(url_encode(EncCodes), Codes),
|
||
|
atom_codes(FileName, EncCodes).
|
||
|
url_to_filename(URL, FileName) :-
|
||
|
www_form_encode(URL, FileName).
|
||
|
|
||
|
url_encode([0'+|T]) -->
|
||
|
" ", !,
|
||
|
url_encode(T).
|
||
|
url_encode([C|T]) -->
|
||
|
alphanum(C), !,
|
||
|
url_encode(T).
|
||
|
url_encode([C|T]) -->
|
||
|
no_enc_extra(C), !,
|
||
|
url_encode(T).
|
||
|
url_encode(Enc) -->
|
||
|
( "\r\n"
|
||
|
; "\n"
|
||
|
), !,
|
||
|
{ append("%0D%0A", T, Enc)
|
||
|
},
|
||
|
url_encode(T).
|
||
|
url_encode([]) -->
|
||
|
eos, !.
|
||
|
url_encode([0'%,D1,D2|T]) -->
|
||
|
[C],
|
||
|
{ Dv1 is (C>>4 /\ 0xf),
|
||
|
Dv2 is (C /\ 0xf),
|
||
|
code_type(D1, xdigit(Dv1)),
|
||
|
code_type(D2, xdigit(Dv2))
|
||
|
},
|
||
|
url_encode(T).
|
||
|
|
||
|
eos([], []).
|
||
|
|
||
|
alphanum(C) -->
|
||
|
[C],
|
||
|
{ C < 128, % US-ASCII
|
||
|
code_type(C, alnum)
|
||
|
}.
|
||
|
|
||
|
no_enc_extra(0'_) --> "_". %'
|
||
|
|
||
|
|
||
|
/*******************************
|
||
|
* UTIL *
|
||
|
*******************************/
|
||
|
|
||
|
%% mkdir(+Directory)
|
||
|
%
|
||
|
% Create a directory if it does not already exist.
|
||
|
|
||
|
mkdir(Directory) :-
|
||
|
exists_directory(Directory), !.
|
||
|
mkdir(Directory) :-
|
||
|
make_directory(Directory).
|
||
|
|
||
|
%% time_stamp(-Integer)
|
||
|
%
|
||
|
% Return time-stamp rounded to integer.
|
||
|
|
||
|
time_stamp(Int) :-
|
||
|
get_time(Now),
|
||
|
Int is round(Now).
|
||
|
|
||
|
|
||
|
/*******************************
|
||
|
* MESSAGES *
|
||
|
*******************************/
|
||
|
|
||
|
:- multifile
|
||
|
prolog:message/3,
|
||
|
prolog:message_context/3.
|
||
|
|
||
|
prolog:message(rdf(Term)) -->
|
||
|
message(Term).
|
||
|
|
||
|
message(restore(attached(Graphs, Time/Wall))) -->
|
||
|
{ catch(Percent is round(100*Time/Wall), _, Percent = 0) },
|
||
|
[ 'Attached ~D graphs in ~2f seconds (~d% CPU = ~2f sec.)'-
|
||
|
[Graphs, Wall, Percent, Time] ].
|
||
|
message(restore(true, Action)) --> !,
|
||
|
silent_message(Action).
|
||
|
message(restore(brief, Action)) --> !,
|
||
|
brief_message(Action).
|
||
|
message(restore(_, source(DB, Nth, Total))) -->
|
||
|
{ file_base_name(DB, Base) },
|
||
|
[ 'Restoring ~w ... (~D of ~D graphs) '-[Base, Nth, Total], flush ].
|
||
|
message(restore(_, snapshot(_))) -->
|
||
|
[ at_same_line, '(snapshot) '-[], flush ].
|
||
|
message(restore(_, journal(_))) -->
|
||
|
[ at_same_line, '(journal) '-[], flush ].
|
||
|
message(restore(_, done(_, Time, Count, _Nth, _Total))) -->
|
||
|
[ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
|
||
|
message(update_failed(S,P,O,Action)) -->
|
||
|
[ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
|
||
|
|
||
|
silent_message(_Action) --> [].
|
||
|
|
||
|
brief_message(source(DB, Nth, Total)) -->
|
||
|
{ file_base_name(DB, Base) },
|
||
|
[ at_same_line,
|
||
|
'\r~w~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
|
||
|
flush
|
||
|
].
|
||
|
brief_message(snapshot(_File)) --> [].
|
||
|
brief_message(journal(_File)) --> [].
|
||
|
brief_message(done(_DB, _Time, _Count, _Nth, _Total)) --> [].
|
||
|
|
||
|
prolog:message_context(rdf_locked(Args)) -->
|
||
|
{ memberchk(time(Time), Args),
|
||
|
memberchk(pid(Pid), Args),
|
||
|
format_time(string(S), '%+', Time)
|
||
|
},
|
||
|
[ nl,
|
||
|
'locked at ~s by process id ~w'-[S,Pid]
|
||
|
].
|