View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2006-2015, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(rdf_persistency,
   37          [ rdf_attach_db/2,            % +Directory, +Options
   38            rdf_detach_db/0,            % +Detach current Graph
   39            rdf_current_db/1,           % -Directory
   40            rdf_persistency/2,          % +Graph, +Bool
   41            rdf_flush_journals/1,       % +Options
   42            rdf_persistency_property/1, % ?Property
   43            rdf_journal_file/2,         % ?Graph, ?JournalFile
   44            rdf_snapshot_file/2,        % ?Graph, ?SnapshotFile
   45            rdf_db_to_file/2            % ?Graph, ?FileBase
   46          ]).   47:- use_module(library(semweb/rdf_db),
   48              [ rdf_graph/1, rdf_unload_graph/1, rdf_statistics/1,
   49                rdf_load_db/1, rdf_retractall/4, rdf_create_graph/1,
   50                rdf_assert/4, rdf_update/5, rdf_monitor/2, rdf/4,
   51                rdf_save_db/2, rdf_atom_md5/3, rdf_current_ns/2,
   52                rdf_register_ns/3
   53              ]).   54
   55:- autoload(library(apply),[maplist/2,maplist/3,partition/4,exclude/3]).   56:- use_module(library(debug),[debug/3]).   57:- autoload(library(error),
   58	    [permission_error/3,must_be/2,domain_error/2]).   59:- autoload(library(filesex),
   60	    [directory_file_path/3,make_directory_path/1]).   61:- autoload(library(lists),[select/3,append/3]).   62:- autoload(library(option),[option/2,option/3]).   63:- autoload(library(readutil),[read_file_to_terms/3]).   64:- autoload(library(socket),[gethostname/1]).   65:- autoload(library(thread),[concurrent/3]).   66:- autoload(library(uri),[uri_encoded/3]).

RDF persistency plugin

This module provides persistency for rdf_db.pl based on the rdf_monitor/2 predicate to track changes to the repository. Where previous versions used autosafe of the whole database using the quick-load format of rdf_db, this version is based on a quick-load file per source (4th argument of rdf/4), and journalling for edit operations.

The result is safe, avoids frequent small changes to large files which makes synchronisation and backup expensive and avoids long disruption of the server doing the autosafe. Only loading large files disrupts service for some time.

The persistent backup of the database is realised in a directory, using a lock file to avoid corruption due to concurrent access. Each source is represented by two files, the latest snapshot and a journal. The state is restored by loading the snapshot and replaying the journal. The predicate rdf_flush_journals/1 can be used to create fresh snapshots and delete the journals.

See also
- rdf_edit.pl */
To be done
- If there is a complete `.new' snapshot and no journal, we should move the .new to the plain snapshot name as a means of recovery.
- Backup of each graph using one or two files is very costly if there are many graphs. Although the currently used subdirectories avoid hitting OS limits early, this is still not ideal. Probably we should collect (small, older?) files and combine them into a single quick load file. We could call this (similar to GIT) a `pack'.
  100:- volatile
  101    rdf_directory/1,
  102    rdf_lock/2,
  103    rdf_option/1,
  104    source_journal_fd/2,
  105    file_base_db/2.  106:- dynamic
  107    rdf_directory/1,                % Absolute path
  108    rdf_lock/2,                     % Dir, Lock
  109    rdf_option/1,                   % Defined options
  110    source_journal_fd/2,            % DB, JournalFD
  111    file_base_db/2.                 % FileBase, DB
  112
  113:- meta_predicate
  114    no_agc(0).  115
  116:- predicate_options(rdf_attach_db/2, 2,
  117                     [ access(oneof([read_write,read_only])),
  118                       concurrency(positive_integer),
  119                       max_open_journals(positive_integer),
  120                       silent(oneof([true,false,brief])),
  121                       log_nested_transactions(boolean)
  122                     ]).
 rdf_attach_db(+Directory, +Options) is det
Start persistent operations using Directory as place to store files. There are several cases:

Options:

access(+AccessMode)
One of auto (default), read_write or read_only. Read-only access implies that the RDF store is not locked. It is read at startup and all modifications to the data are temporary. The default auto mode is read_write if the directory is writeable and the lock can be acquired. Otherwise it reverts to read_only.
concurrency(+Jobs)
Number of threads to use for loading the initial database. If not provided it is the number of CPUs as optained from the flag cpu_count.
max_open_journals(+Count)
Maximum number of journals kept open. If not provided, the default is 10. See limit_fd_pool/0.
directory_levels(+Count)
Number of levels of intermediate directories for storing the graph files. Default is 2.
silent(+BoolOrBrief)
If true (default false), do not print informational messages. Finally, if brief it will show minimal feedback.
log_nested_transactions(+Boolean)
If true, nested log transactions are added to the journal information. By default (false), no log-term is added for nested transactions.\\
Errors
- existence_error(source_sink, Directory)
- permission_error(write, directory, Directory)
  172rdf_attach_db(DirSpec, Options) :-
  173    option(access(read_only), Options),
  174    !,
  175    absolute_file_name(DirSpec,
  176                       Directory,
  177                       [ access(read),
  178                         file_type(directory)
  179                       ]),
  180    rdf_attach_db_ro(Directory, Options).
  181rdf_attach_db(DirSpec, Options) :-
  182    option(access(read_write), Options),
  183    !,
  184    rdf_attach_db_rw(DirSpec, Options).
  185rdf_attach_db(DirSpec, Options) :-
  186    absolute_file_name(DirSpec,
  187                       Directory,
  188                       [ access(exist),
  189                         file_type(directory),
  190                         file_errors(fail)
  191                       ]),
  192    !,
  193    (   access_file(Directory, write)
  194    ->  catch(rdf_attach_db_rw(Directory, Options), E, true),
  195        (   var(E)
  196        ->  true
  197        ;   E = error(permission_error(lock, rdf_db, _), _)
  198        ->  print_message(warning, E),
  199            print_message(warning, rdf(read_only)),
  200            rdf_attach_db(DirSpec, [access(read_only)|Options])
  201        ;   throw(E)
  202        )
  203    ;   print_message(warning,
  204                      error(permission_error(write, directory, Directory))),
  205        print_message(warning, rdf(read_only)),
  206        rdf_attach_db_ro(Directory, Options)
  207    ).
  208rdf_attach_db(DirSpec, Options) :-
  209    catch(rdf_attach_db_rw(DirSpec, Options), E, true),
  210    (   var(E)
  211    ->  true
  212    ;   print_message(warning, E),
  213        print_message(warning, rdf(read_only)),
  214        rdf_attach_db(DirSpec, [access(read_only)|Options])
  215    ).
  216
  217
  218rdf_attach_db_rw(DirSpec, Options) :-
  219    absolute_file_name(DirSpec,
  220                       Directory,
  221                       [ access(write),
  222                         file_type(directory),
  223                         file_errors(fail)
  224                       ]),
  225    !,
  226    (   rdf_directory(Directory)
  227    ->  true                        % update settings?
  228    ;   rdf_detach_db,
  229        mkdir(Directory),
  230        lock_db(Directory),
  231        assert(rdf_directory(Directory)),
  232        assert_options(Options),
  233        stop_monitor,               % make sure not to register load
  234        no_agc(load_db),
  235        at_halt(rdf_detach_db),
  236        start_monitor
  237    ).
  238rdf_attach_db_rw(DirSpec, Options) :-
  239    absolute_file_name(DirSpec,
  240                       Directory,
  241                       [ solutions(all)
  242                       ]),
  243    (   exists_directory(Directory)
  244    ->  access_file(Directory, write)
  245    ;   catch(make_directory(Directory), _, fail)
  246    ),
  247    !,
  248    rdf_attach_db(Directory, Options).
  249rdf_attach_db_rw(DirSpec, _) :-         % Generate an existence or
  250    absolute_file_name(DirSpec,     % permission error
  251                       Directory,
  252                       [ access(exist),
  253                         file_type(directory)
  254                       ]),
  255    permission_error(write, directory, Directory).
 rdf_attach_db_ro(+Directory, +Options)
Open an RDF database in read-only mode.
  261rdf_attach_db_ro(Directory, Options) :-
  262    rdf_detach_db,
  263    assert(rdf_directory(Directory)),
  264    assert_options(Options),
  265    stop_monitor,           % make sure not to register load
  266    no_agc(load_db).
  267
  268
  269assert_options([]).
  270assert_options([H|T]) :-
  271    (   option_type(H, Check)
  272    ->  Check,
  273        assert(rdf_option(H))
  274    ;   true                        % ignore options we do not understand
  275    ),
  276    assert_options(T).
  277
  278option_type(concurrency(X),             must_be(positive_integer, X)).
  279option_type(max_open_journals(X),       must_be(positive_integer, X)).
  280option_type(directory_levels(X),        must_be(positive_integer, X)).
  281option_type(silent(X),                  must_be(oneof([true,false,brief]), X)).
  282option_type(log_nested_transactions(X), must_be(boolean, X)).
  283option_type(access(X),                  must_be(oneof([read_write,
  284                                                       read_only]), X)).
 rdf_persistency_property(?Property) is nondet
True when Property is a property of the current persistent database. Exposes the properties that can be passed as options to rdf_attach_db/2. Specifically, rdf_persistency_property(access(read_only)) is true iff the database is mounted in read-only mode. In addition, the following property is supported:
directory(Dir)
The directory in which the database resides.
  299rdf_persistency_property(Property) :-
  300    var(Property),
  301    !,
  302    rdf_persistency_property_(Property).
  303rdf_persistency_property(Property) :-
  304    rdf_persistency_property_(Property),
  305    !.
  306
  307rdf_persistency_property_(Property) :-
  308    rdf_option(Property).
  309rdf_persistency_property_(directory(Dir)) :-
  310    rdf_directory(Dir).
 no_agc(:Goal)
Run Goal with atom garbage collection disabled. Loading an RDF database creates large amounts of atoms we know are not garbage.
  318no_agc(Goal) :-
  319    current_prolog_flag(agc_margin, Old),
  320    setup_call_cleanup(
  321        set_prolog_flag(agc_margin, 0),
  322        Goal,
  323        set_prolog_flag(agc_margin, Old)).
 rdf_detach_db is det
Detach from the current database. Succeeds silently if no database is attached. Normally called at the end of the program through at_halt/1.
  332rdf_detach_db :-
  333    debug(halt, 'Detaching RDF database', []),
  334    stop_monitor,
  335    close_journals,
  336    (   retract(rdf_directory(Dir))
  337    ->  debug(halt, 'DB Directory: ~w', [Dir]),
  338        save_prefixes(Dir),
  339        retractall(rdf_option(_)),
  340        retractall(source_journal_fd(_,_)),
  341        unlock_db(Dir)
  342    ;   true
  343    ).
 rdf_current_db(?Dir)
True if Dir is the current RDF persistent database.
  350rdf_current_db(Directory) :-
  351    rdf_directory(Dir),
  352    !,
  353    Dir = Directory.
 rdf_flush_journals(+Options)
Flush dirty journals. Options:
min_size(+KB)
Only flush if journal is over KB in size.
graph(+Graph)
Only flush the journal of Graph
To be done
- Provide a default for min_size?
  367rdf_flush_journals(Options) :-
  368    option(graph(Graph), Options, _),
  369    forall(rdf_graph(Graph),
  370           rdf_flush_journal(Graph, Options)).
  371
  372rdf_flush_journal(Graph, Options) :-
  373    db_files(Graph, _SnapshotFile, JournalFile),
  374    db_file(JournalFile, File),
  375    (   \+ exists_file(File)
  376    ->  true
  377    ;   memberchk(min_size(KB), Options),
  378        size_file(File, Size),
  379        Size / 1024 < KB
  380    ->  true
  381    ;   create_db(Graph)
  382    ).
  383
  384                 /*******************************
  385                 *             LOAD             *
  386                 *******************************/
 load_db is det
Reload database from the directory specified by rdf_directory/1. First we find all names graphs using find_dbs/1 and then we load them.
  394load_db :-
  395    rdf_directory(Dir),
  396    concurrency(Jobs),
  397    cpu_stat_key(Jobs, StatKey),
  398    get_time(Wall0),
  399    statistics(StatKey, T0),
  400    load_prefixes(Dir),
  401    verbosity(Silent),
  402    find_dbs(Dir, Graphs, SnapShots, Journals),
  403    length(Graphs, GraphCount),
  404    maplist(rdf_unload_graph, Graphs),
  405    rdf_statistics(triples(Triples0)),
  406    load_sources(snapshots, SnapShots, Silent, Jobs),
  407    load_sources(journals, Journals, Silent, Jobs),
  408    rdf_statistics(triples(Triples1)),
  409    statistics(StatKey, T1),
  410    get_time(Wall1),
  411    T is T1 - T0,
  412    Wall is Wall1 - Wall0,
  413    Triples = Triples1 - Triples0,
  414    message_level(Silent, Level),
  415    print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
  416
  417load_sources(_, [], _, _) :- !.
  418load_sources(Type, Sources, Silent, Jobs) :-
  419    length(Sources, Count),
  420    RunJobs is min(Count, Jobs),
  421    print_message(informational, rdf(restoring(Type, Count, RunJobs))),
  422    make_goals(Sources, Silent, 1, Count, Goals),
  423    concurrent(RunJobs, Goals, []).
 make_goals(+DBs, +Silent, +Index, +Total, -Goals)
  428make_goals([], _, _, _, []).
  429make_goals([DB|T0], Silent, I,  Total,
  430           [load_source(DB, Silent, I, Total)|T]) :-
  431    I2 is I + 1,
  432    make_goals(T0, Silent, I2, Total, T).
  433
  434verbosity(Silent) :-
  435    rdf_option(silent(Silent)),
  436    !.
  437verbosity(Silent) :-
  438    current_prolog_flag(verbose, silent),
  439    !,
  440    Silent = true.
  441verbosity(brief).
 concurrency(-Jobs)
Number of jobs to run concurrently.
  448concurrency(Jobs) :-
  449    rdf_option(concurrency(Jobs)),
  450    !.
  451concurrency(Jobs) :-
  452    current_prolog_flag(cpu_count, Jobs),
  453    Jobs > 0,
  454    !.
  455concurrency(1).
  456
  457cpu_stat_key(1, cputime) :- !.
  458cpu_stat_key(_, process_cputime).
 find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det
Scan the persistent database and return a list of snapshots and journals, both sorted by file-size. Each term is of the form
db(Size, Ext, DB, DBFile, Depth)
  470find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
  471    directory_files(Dir, Files),
  472    phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
  473    maplist(db_graph, Scanned, UnsortedGraphs),
  474    sort(UnsortedGraphs, Graphs),
  475    (   consider_reindex_db(Dir, Graphs, Scanned)
  476    ->  find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
  477    ;   partition(db_is_snapshot, Scanned, Snapshots, Journals),
  478        sort(Snapshots, SnapBySize),
  479        sort(Journals, JournalBySize)
  480    ).
  481
  482consider_reindex_db(Dir, Graphs, Scanned) :-
  483    length(Graphs, Count),
  484    Count > 0,
  485    DepthNeeded is floor(log(Count)/log(256)),
  486    (   maplist(depth_db(DepthNow), Scanned)
  487    ->  (   DepthNeeded > DepthNow
  488        ->  true
  489        ;   retractall(rdf_option(directory_levels(_))),
  490            assertz(rdf_option(directory_levels(DepthNow))),
  491            fail
  492        )
  493    ;   true
  494    ),
  495    reindex_db(Dir, DepthNeeded).
  496
  497db_is_snapshot(Term) :-
  498    arg(2, Term, trp).
  499
  500db_graph(Term, DB) :-
  501    arg(3, Term, DB).
  502
  503db_file_name(Term, File) :-
  504    arg(4, Term, File).
  505
  506depth_db(Depth, DB) :-
  507    arg(5, DB, Depth).
 scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det
Produces a list of db(DB, Size, File) for all recognised RDF database files. File is relative to the database directory Dir.
  514scan_db_files([], _, _, _) -->
  515    [].
  516scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
  517    { nofollow(Nofollow) },
  518    !,
  519    scan_db_files(T, Dir, Prefix, Depth).
  520scan_db_files([File|T], Dir, Prefix, Depth) -->
  521    { file_name_extension(Base, Ext, File),
  522      db_extension(Ext),
  523      !,
  524      rdf_db_to_file(DB, Base),
  525      directory_file_path(Prefix, File, DBFile),
  526      directory_file_path(Dir, DBFile, AbsFile),
  527      size_file(AbsFile, Size)
  528    },
  529    [ db(Size, Ext, DB, AbsFile, Depth) ],
  530    scan_db_files(T, Dir, Prefix, Depth).
  531scan_db_files([D|T], Dir, Prefix, Depth) -->
  532    { directory_file_path(Prefix, D, SubD),
  533      directory_file_path(Dir, SubD, AbsD),
  534      exists_directory(AbsD),
  535      \+ read_link(AbsD, _, _),    % Do not follow links
  536      !,
  537      directory_files(AbsD, SubFiles),
  538      SubDepth is Depth + 1
  539    },
  540    scan_db_files(SubFiles, Dir, SubD, SubDepth),
  541    scan_db_files(T, Dir, Prefix, Depth).
  542scan_db_files([_|T], Dir, Prefix, Depth) -->
  543    scan_db_files(T, Dir, Prefix, Depth).
  544
  545nofollow(.).
  546nofollow(..).
  547
  548db_extension(trp).
  549db_extension(jrn).
  550
  551:- public load_source/4.                % called through make_goals/5
  552
  553load_source(DB, Silent, Nth, Total) :-
  554    db_file_name(DB, File),
  555    db_graph(DB, Graph),
  556    message_level(Silent, Level),
  557    graph_triple_count(Graph, Count0),
  558    statistics(cputime, T0),
  559    (   db_is_snapshot(DB)
  560    ->  print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
  561        rdf_load_db(File)
  562    ;   print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
  563        load_journal(File, Graph)
  564    ),
  565    statistics(cputime, T1),
  566    T is T1 - T0,
  567    graph_triple_count(Graph, Count1),
  568    Count is Count1 - Count0,
  569    print_message(Level, rdf(restore(Silent,
  570                                     done(Graph, T, Count, Nth, Total)))).
  571
  572
  573graph_triple_count(Graph, Count) :-
  574    rdf_statistics(triples_by_graph(Graph, Count)),
  575    !.
  576graph_triple_count(_, 0).
 attach_graph(+Graph, +Options) is det
Load triples and reload journal from the indicated snapshot file.
  584attach_graph(Graph, Options) :-
  585    (   option(silent(true), Options)
  586    ->  Level = silent
  587    ;   Level = informational
  588    ),
  589    db_files(Graph, SnapshotFile, JournalFile),
  590    rdf_retractall(_,_,_,Graph),
  591    statistics(cputime, T0),
  592    print_message(Level, rdf(restore(Silent, Graph))),
  593    db_file(SnapshotFile, AbsSnapShot),
  594    (   exists_file(AbsSnapShot)
  595    ->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
  596        rdf_load_db(AbsSnapShot)
  597    ;   true
  598    ),
  599    (   exists_db(JournalFile)
  600    ->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
  601        load_journal(JournalFile, Graph)
  602    ;   true
  603    ),
  604    statistics(cputime, T1),
  605    T is T1 - T0,
  606    (   rdf_statistics(triples_by_graph(Graph, Count))
  607    ->  true
  608    ;   Count = 0
  609    ),
  610    print_message(Level, rdf(restore(Silent,
  611                                     done(Graph, T, Count)))).
  612
  613message_level(true, silent) :- !.
  614message_level(_, informational).
  615
  616
  617                 /*******************************
  618                 *         LOAD JOURNAL         *
  619                 *******************************/
 load_journal(+File:atom, +DB:atom) is det
Process transactions from the RDF journal File, adding the given named graph.
  626load_journal(File, DB) :-
  627    rdf_create_graph(DB),
  628    setup_call_cleanup(
  629        open(File, read, In, [encoding(utf8)]),
  630        ( read(In, T0),
  631          process_journal(T0, In, DB)
  632        ),
  633        close(In)).
  634
  635process_journal(end_of_file, _, _) :- !.
  636process_journal(Term, In, DB) :-
  637    (   process_journal_term(Term, DB)
  638    ->  true
  639    ;   throw(error(type_error(journal_term, Term), _))
  640    ),
  641    read(In, T2),
  642    process_journal(T2, In, DB).
  643
  644process_journal_term(assert(S,P,O), DB) :-
  645    rdf_assert(S,P,O,DB).
  646process_journal_term(assert(S,P,O,Line), DB) :-
  647    rdf_assert(S,P,O,DB:Line).
  648process_journal_term(retract(S,P,O), DB) :-
  649    rdf_retractall(S,P,O,DB).
  650process_journal_term(retract(S,P,O,Line), DB) :-
  651    rdf_retractall(S,P,O,DB:Line).
  652process_journal_term(update(S,P,O,Action), DB) :-
  653    (   rdf_update(S,P,O,DB, Action)
  654    ->  true
  655    ;   print_message(warning, rdf(update_failed(S,P,O,Action)))
  656    ).
  657process_journal_term(start(_), _).      % journal open/close
  658process_journal_term(end(_), _).
  659process_journal_term(begin(_), _).      % logged transaction (compatibility)
  660process_journal_term(end, _).
  661process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
  662process_journal_term(end(_,_,_), _).
  663
  664
  665                 /*******************************
  666                 *         CREATE JOURNAL       *
  667                 *******************************/
  668
  669:- dynamic
  670    blocked_db/2,                   % DB, Reason
  671    transaction_message/3,          % Nesting, Time, Message
  672    transaction_db/3.               % Nesting, DB, Id
 rdf_persistency(+DB, Bool)
Specify whether a database is persistent. Switching to false kills the persistent state. Switching to true creates it.
  679rdf_persistency(DB, Bool) :-
  680    must_be(atom, DB),
  681    must_be(boolean, Bool),
  682    fail.
  683rdf_persistency(DB, false) :-
  684    !,
  685    (   blocked_db(DB, persistency)
  686    ->  true
  687    ;   assert(blocked_db(DB, persistency)),
  688        delete_db(DB)
  689    ).
  690rdf_persistency(DB, true) :-
  691    (   retract(blocked_db(DB, persistency))
  692    ->  create_db(DB)
  693    ;   true
  694    ).
 rdf_db:property_of_graph(?Property, +Graph) is nondet
Extend rdf_graph_property/2 with new properties.
  700:- multifile
  701    rdf_db:property_of_graph/2.  702
  703rdf_db:property_of_graph(persistent(State), Graph) :-
  704    (   blocked_db(Graph, persistency)
  705    ->  State = false
  706    ;   State = true
  707    ).
 start_monitor is det
 stop_monitor is det
Start/stop monitoring the RDF database for changes and update the journal.
  716start_monitor :-
  717    rdf_monitor(monitor,
  718                [ -assert(load)
  719                ]).
  720stop_monitor :-
  721    rdf_monitor(monitor,
  722                [ -all
  723                ]).
 monitor(+Term) is semidet
Handle an rdf_monitor/2 callback to deal with persistency. Note that the monitor calls that come from rdf_db.pl that deal with database changes are serialized. They do come from different threads though.
  732monitor(Msg) :-
  733    debug(monitor, 'Monitor: ~p~n', [Msg]),
  734    fail.
  735monitor(assert(S,P,O,DB:Line)) :-
  736    !,
  737    \+ blocked_db(DB, _),
  738    journal_fd(DB, Fd),
  739    open_transaction(DB, Fd),
  740    format(Fd, '~q.~n', [assert(S,P,O,Line)]),
  741    sync_journal(DB, Fd).
  742monitor(assert(S,P,O,DB)) :-
  743    \+ blocked_db(DB, _),
  744    journal_fd(DB, Fd),
  745    open_transaction(DB, Fd),
  746    format(Fd, '~q.~n', [assert(S,P,O)]),
  747    sync_journal(DB, Fd).
  748monitor(retract(S,P,O,DB:Line)) :-
  749    !,
  750    \+ blocked_db(DB, _),
  751    journal_fd(DB, Fd),
  752    open_transaction(DB, Fd),
  753    format(Fd, '~q.~n', [retract(S,P,O,Line)]),
  754    sync_journal(DB, Fd).
  755monitor(retract(S,P,O,DB)) :-
  756    \+ blocked_db(DB, _),
  757    journal_fd(DB, Fd),
  758    open_transaction(DB, Fd),
  759    format(Fd, '~q.~n', [retract(S,P,O)]),
  760    sync_journal(DB, Fd).
  761monitor(update(S,P,O,DB:Line,Action)) :-
  762    !,
  763    \+ blocked_db(DB, _),
  764    (   Action = graph(NewDB)
  765    ->  monitor(assert(S,P,O,NewDB)),
  766        monitor(retract(S,P,O,DB:Line))
  767    ;   journal_fd(DB, Fd),
  768        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  769        sync_journal(DB, Fd)
  770    ).
  771monitor(update(S,P,O,DB,Action)) :-
  772    \+ blocked_db(DB, _),
  773    (   Action = graph(NewDB)
  774    ->  monitor(assert(S,P,O,NewDB)),
  775        monitor(retract(S,P,O,DB))
  776    ;   journal_fd(DB, Fd),
  777        open_transaction(DB, Fd),
  778        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  779        sync_journal(DB, Fd)
  780    ).
  781monitor(load(BE, _DumpFileURI)) :-
  782    (   BE = end(Graphs)
  783    ->  sync_loaded_graphs(Graphs)
  784    ;   true
  785    ).
  786monitor(create_graph(Graph)) :-
  787    \+ blocked_db(Graph, _),
  788    journal_fd(Graph, Fd),
  789    open_transaction(Graph, Fd),
  790    sync_journal(Graph, Fd).
  791monitor(reset) :-
  792    forall(rdf_graph(Graph), delete_db(Graph)).
  793                                        % TBD: Remove empty directories?
  794
  795monitor(transaction(BE, Id)) :-
  796    monitor_transaction(Id, BE).
  797
  798monitor_transaction(load_journal(DB), begin(_)) :-
  799    !,
  800    assert(blocked_db(DB, journal)).
  801monitor_transaction(load_journal(DB), end(_)) :-
  802    !,
  803    retractall(blocked_db(DB, journal)).
  804
  805monitor_transaction(parse(URI), begin(_)) :-
  806    !,
  807    (   blocked_db(URI, persistency)
  808    ->  true
  809    ;   assert(blocked_db(URI, parse))
  810    ).
  811monitor_transaction(parse(URI), end(_)) :-
  812    !,
  813    (   retract(blocked_db(URI, parse))
  814    ->  create_db(URI)
  815    ;   true
  816    ).
  817monitor_transaction(unload(DB), begin(_)) :-
  818    !,
  819    (   blocked_db(DB, persistency)
  820    ->  true
  821    ;   assert(blocked_db(DB, unload))
  822    ).
  823monitor_transaction(unload(DB), end(_)) :-
  824    !,
  825    (   retract(blocked_db(DB, unload))
  826    ->  delete_db(DB)
  827    ;   true
  828    ).
  829monitor_transaction(log(Msg), begin(N)) :-
  830    !,
  831    check_nested(N),
  832    get_time(Time),
  833    asserta(transaction_message(N, Time, Msg)).
  834monitor_transaction(log(_), end(N)) :-
  835    check_nested(N),
  836    retract(transaction_message(N, _, _)),
  837    !,
  838    findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
  839    end_transactions(DBs, N).
  840monitor_transaction(log(Msg, DB), begin(N)) :-
  841    !,
  842    check_nested(N),
  843    get_time(Time),
  844    asserta(transaction_message(N, Time, Msg)),
  845    journal_fd(DB, Fd),
  846    open_transaction(DB, Fd).
  847monitor_transaction(log(Msg, _DB), end(N)) :-
  848    monitor_transaction(log(Msg), end(N)).
 check_nested(+Level) is semidet
True if we must log this transaction. This is always the case for toplevel transactions. Nested transactions are only logged if log_nested_transactions(true) is defined.
  857check_nested(0) :- !.
  858check_nested(_) :-
  859    rdf_option(log_nested_transactions(true)).
 open_transaction(+DB, +Fd) is det
Add a begin(Id, Level, Time, Message) term if a transaction involves DB. Id is an incremental integer, where each database has its own counter. Level is the nesting level, Time a floating point timestamp and Message te message provided as argument to the log message.
  870open_transaction(DB, Fd) :-
  871    transaction_message(N, Time, Msg),
  872    !,
  873    (   transaction_db(N, DB, _)
  874    ->  true
  875    ;   next_transaction_id(DB, Id),
  876        assert(transaction_db(N, DB, Id)),
  877        RoundedTime is round(Time*100)/100,
  878        format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
  879    ).
  880open_transaction(_,_).
 next_transaction_id(+DB, -Id) is det
Id is the number to user for the next logged transaction on DB. Transactions in each named graph are numbered in sequence. Searching the Id of the last transaction is performed by the 2nd clause starting 1Kb from the end and doubling this offset each failure.
  891:- dynamic
  892    current_transaction_id/2.  893
  894next_transaction_id(DB, Id) :-
  895    retract(current_transaction_id(DB, Last)),
  896    !,
  897    Id is Last + 1,
  898    assert(current_transaction_id(DB, Id)).
  899next_transaction_id(DB, Id) :-
  900    db_files(DB, _, Journal),
  901    exists_file(Journal),
  902    !,
  903    size_file(Journal, Size),
  904    open_db(Journal, read, In, []),
  905    call_cleanup(iterative_expand(In, Size, Last), close(In)),
  906    Id is Last + 1,
  907    assert(current_transaction_id(DB, Id)).
  908next_transaction_id(DB, 1) :-
  909    assert(current_transaction_id(DB, 1)).
  910
  911iterative_expand(_, 0, 0) :- !.
  912iterative_expand(In, Size, Last) :-     % Scan growing sections from the end
  913    Max is floor(log(Size)/log(2)),
  914    between(10, Max, Step),
  915    Offset is -(1<<Step),
  916    seek(In, Offset, eof, _),
  917    skip(In, 10),                   % records are line-based
  918    read(In, T0),
  919    last_transaction_id(T0, In, 0, Last),
  920    Last > 0,
  921    !.
  922iterative_expand(In, _, Last) :-        % Scan the whole file
  923    seek(In, 0, bof, _),
  924    read(In, T0),
  925    last_transaction_id(T0, In, 0, Last).
  926
  927last_transaction_id(end_of_file, _, Last, Last) :- !.
  928last_transaction_id(end(Id, _, _), In, _, Last) :-
  929    read(In, T1),
  930    last_transaction_id(T1, In, Id, Last).
  931last_transaction_id(_, In, Id, Last) :-
  932    read(In, T1),
  933    last_transaction_id(T1, In, Id, Last).
 end_transactions(+DBs:list(atom:id)) is det
End a transaction that affected the given list of databases. We write the list of other affected databases as an argument to the end-term to facilitate fast finding of the related transactions.

In each database, the transaction is ended with a term end(Id, Nesting, Others), where Id and Nesting are the transaction identifier and nesting (see open_transaction/2) and Others is a list of DB:Id, indicating other databases affected by the transaction.

  948end_transactions(DBs, N) :-
  949    end_transactions(DBs, DBs, N).
  950
  951end_transactions([], _, _).
  952end_transactions([DB:Id|T], DBs, N) :-
  953    journal_fd(DB, Fd),
  954    once(select(DB:Id, DBs, Others)),
  955    format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
  956    sync_journal(DB, Fd),
  957    end_transactions(T, DBs, N).
 sync_loaded_graphs(+Graphs)
Called after a binary triple has been loaded that added triples to the given graphs.
  965sync_loaded_graphs(Graphs) :-
  966    maplist(create_db, Graphs).
  967
  968
  969                 /*******************************
  970                 *         JOURNAL FILES        *
  971                 *******************************/
 journal_fd(+DB, -Stream) is det
Get an open stream to a journal. If the journal is not open, old journals are closed to satisfy the max_open_journals option. Then the journal is opened in append mode. Journal files are always encoded as UTF-8 for portability as well as to ensure full coverage of Unicode.
  981journal_fd(DB, Fd) :-
  982    source_journal_fd(DB, Fd),
  983    !.
  984journal_fd(DB, Fd) :-
  985    with_mutex(rdf_journal_file,
  986               journal_fd_(DB, Out)),
  987    Fd = Out.
  988
  989journal_fd_(DB, Fd) :-
  990    source_journal_fd(DB, Fd),
  991    !.
  992journal_fd_(DB, Fd) :-
  993    limit_fd_pool,
  994    db_files(DB, _Snapshot, Journal),
  995    open_db(Journal, append, Fd,
  996            [ close_on_abort(false)
  997            ]),
  998    time_stamp(Now),
  999    format(Fd, '~q.~n', [start([time(Now)])]),
 1000    assert(source_journal_fd(DB, Fd)).              % new one at the end
 limit_fd_pool is det
Limit the number of open journals to max_open_journals (10). Note that calls from rdf_monitor/2 are issued in different threads, but as they are part of write operations they are fully synchronised.
 1009limit_fd_pool :-
 1010    predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
 1011    !,
 1012    (   rdf_option(max_open_journals(Max))
 1013    ->  true
 1014    ;   Max = 10
 1015    ),
 1016    Close is N - Max,
 1017    forall(between(1, Close, _),
 1018           close_oldest_journal).
 1019limit_fd_pool.
 1020
 1021close_oldest_journal :-
 1022    source_journal_fd(DB, _Fd),
 1023    !,
 1024    debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
 1025    close_journal(DB).
 1026close_oldest_journal.
 sync_journal(+DB, +Fd)
Sync journal represented by database and stream. If the DB is involved in a transaction there is no point flushing until the end of the transaction.
 1035sync_journal(DB, _) :-
 1036    transaction_db(_, DB, _),
 1037    !.
 1038sync_journal(_, Fd) :-
 1039    flush_output(Fd).
 close_journal(+DB) is det
Close the journal associated with DB if it is open.
 1045close_journal(DB) :-
 1046    with_mutex(rdf_journal_file,
 1047               close_journal_(DB)).
 1048
 1049close_journal_(DB) :-
 1050    (   retract(source_journal_fd(DB, Fd))
 1051    ->  time_stamp(Now),
 1052        format(Fd, '~q.~n', [end([time(Now)])]),
 1053        close(Fd, [force(true)])
 1054    ;   true
 1055    ).
 close_journals
Close all open journals.
 1061close_journals :-
 1062    forall(source_journal_fd(DB, _),
 1063           catch(close_journal(DB), E,
 1064                 print_message(error, E))).
 create_db(+Graph)
Create a saved version of Graph in corresponding file, close and delete journals.
 1071create_db(Graph) :-
 1072    \+ rdf(_,_,_,Graph),
 1073    !,
 1074    debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
 1075    delete_db(Graph).
 1076create_db(Graph) :-
 1077    debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
 1078    close_journal(Graph),
 1079    db_abs_files(Graph, Snapshot, Journal),
 1080    atom_concat(Snapshot, '.new', NewSnapshot),
 1081    (   catch(( create_directory_levels(Snapshot),
 1082                rdf_save_db(NewSnapshot, Graph)
 1083              ), Error,
 1084              ( print_message(warning, Error),
 1085                fail
 1086              ))
 1087    ->  (   exists_file(Journal)
 1088        ->  delete_file(Journal)
 1089        ;   true
 1090        ),
 1091        rename_file(NewSnapshot, Snapshot),
 1092        debug(rdf_persistency, 'Saved Graph ~w', [Graph])
 1093    ;   catch(delete_file(NewSnapshot), _, true)
 1094    ).
 delete_db(+DB)
Remove snapshot and journal file for DB.
 1101delete_db(DB) :-
 1102    with_mutex(rdf_journal_file,
 1103               delete_db_(DB)).
 1104
 1105delete_db_(DB) :-
 1106    close_journal_(DB),
 1107    db_abs_files(DB, Snapshot, Journal),
 1108    !,
 1109    (   exists_file(Journal)
 1110    ->  delete_file(Journal)
 1111    ;   true
 1112    ),
 1113    (   exists_file(Snapshot)
 1114    ->  delete_file(Snapshot)
 1115    ;   true
 1116    ).
 1117delete_db_(_).
 1118
 1119                 /*******************************
 1120                 *             LOCKING          *
 1121                 *******************************/
 lock_db(+Dir)
Lock the database directory Dir.
 1127lock_db(Dir) :-
 1128    lockfile(Dir, File),
 1129    catch(open(File, update, Out, [lock(write), wait(false)]),
 1130          error(permission_error(Access, _, _), _),
 1131          locked_error(Access, Dir)),
 1132    (   current_prolog_flag(pid, PID)
 1133    ->  true
 1134    ;   PID = 0                     % TBD: Fix in Prolog
 1135    ),
 1136    time_stamp(Now),
 1137    gethostname(Host),
 1138    format(Out, '/* RDF Database is in use */~n~n', []),
 1139    format(Out, '~q.~n', [ locked([ time(Now),
 1140                                    pid(PID),
 1141                                    host(Host)
 1142                                  ])
 1143                         ]),
 1144    flush_output(Out),
 1145    set_end_of_stream(Out),
 1146    assert(rdf_lock(Dir, lock(Out, File))),
 1147    at_halt(unlock_db(Dir)).
 1148
 1149locked_error(lock, Dir) :-
 1150    lockfile(Dir, File),
 1151    (   catch(read_file_to_terms(File, Terms, []), _, fail),
 1152        Terms = [locked(Args)]
 1153    ->  Context = rdf_locked(Args)
 1154    ;   Context = context(_, 'Database is in use')
 1155    ),
 1156    throw(error(permission_error(lock, rdf_db, Dir), Context)).
 1157locked_error(open, Dir) :-
 1158    throw(error(permission_error(lock, rdf_db, Dir),
 1159                context(_, 'Lock file cannot be opened'))).
 unlock_db(+Dir) is det
 unlock_db(+Stream, +File) is det
 1164unlock_db(Dir) :-
 1165    retract(rdf_lock(Dir, lock(Out, File))),
 1166    !,
 1167    unlock_db(Out, File).
 1168unlock_db(_).
 1169
 1170unlock_db(Out, File) :-
 1171    close(Out),
 1172    delete_file(File).
 1173
 1174                 /*******************************
 1175                 *           FILENAMES          *
 1176                 *******************************/
 1177
 1178lockfile(Dir, LockFile) :-
 1179    atomic_list_concat([Dir, /, lock], LockFile).
 1180
 1181directory_levels(Levels) :-
 1182    rdf_option(directory_levels(Levels)),
 1183    !.
 1184directory_levels(2).
 1185
 1186db_file(Base, File) :-
 1187    rdf_directory(Dir),
 1188    directory_levels(Levels),
 1189    db_file(Dir, Base, Levels, File).
 1190
 1191db_file(Dir, Base, Levels, File) :-
 1192    dir_levels(Base, Levels, Segments, [Base]),
 1193    atomic_list_concat([Dir|Segments], /, File).
 1194
 1195open_db(Base, Mode, Stream, Options) :-
 1196    db_file(Base, File),
 1197    create_directory_levels(File),
 1198    open(File, Mode, Stream, [encoding(utf8)|Options]).
 1199
 1200create_directory_levels(_File) :-
 1201    rdf_option(directory_levels(0)),
 1202    !.
 1203create_directory_levels(File) :-
 1204    file_directory_name(File, Dir),
 1205    make_directory_path(Dir).
 1206
 1207exists_db(Base) :-
 1208    db_file(Base, File),
 1209    exists_file(File).
 dir_levels(+File, +Levels, ?Segments, ?Tail) is det
Create a list of intermediate directory names for File. Each directory consists of two hexadecimal digits.
 1216dir_levels(_, 0, Segments, Segments) :- !.
 1217dir_levels(File, Levels, Segments, Tail) :-
 1218    rdf_atom_md5(File, 1, Hash),
 1219    create_dir_levels(Levels, 0, Hash, Segments, Tail).
 1220
 1221create_dir_levels(0, _, _, Segments, Segments) :- !.
 1222create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
 1223    sub_atom(Hash, S, 2, _, S1),
 1224    S2 is S+2,
 1225    N2 is N-1,
 1226    create_dir_levels(N2, S2, Hash, Segments0, Tail).
 db_files(+DB, -Snapshot, -Journal)
db_files(-DB, +Snapshot, -Journal)
db_files(-DB, -Snapshot, +Journal)
True if named graph DB is represented by the files Snapshot and Journal. The filenames are local to the directory representing the store.
 1237db_files(DB, Snapshot, Journal) :-
 1238    nonvar(DB),
 1239    !,
 1240    rdf_db_to_file(DB, Base),
 1241    atom_concat(Base, '.trp', Snapshot),
 1242    atom_concat(Base, '.jrn', Journal).
 1243db_files(DB, Snapshot, Journal) :-
 1244    nonvar(Snapshot),
 1245    !,
 1246    atom_concat(Base, '.trp', Snapshot),
 1247    atom_concat(Base, '.jrn', Journal),
 1248    rdf_db_to_file(DB, Base).
 1249db_files(DB, Snapshot, Journal) :-
 1250    nonvar(Journal),
 1251    !,
 1252    atom_concat(Base, '.jrn', Journal),
 1253    atom_concat(Base, '.trp', Snapshot),
 1254    rdf_db_to_file(DB, Base).
 1255
 1256db_abs_files(DB, Snapshot, Journal) :-
 1257    db_files(DB, Snapshot0, Journal0),
 1258    db_file(Snapshot0, Snapshot),
 1259    db_file(Journal0, Journal).
 rdf_journal_file(+Graph, -File) is semidet
rdf_journal_file(-Graph, -File) is nondet
True if File the name of the existing journal file for Graph.
 1267rdf_journal_file(Graph, Journal) :-
 1268    (   var(Graph)
 1269    ->  rdf_graph(Graph)
 1270    ;   true
 1271    ),
 1272    db_abs_files(Graph, _Snapshot, Journal),
 1273    exists_file(Journal).
 rdf_snapshot_file(+Graph, -File) is semidet
rdf_snapshot_file(-Graph, -File) is nondet
True if File the name of the existing snapshot file for Graph.
 1281rdf_snapshot_file(Graph, Snapshot) :-
 1282    (   var(Graph)
 1283    ->  rdf_graph(Graph)    % also pick the empty graphs
 1284    ;   true
 1285    ),
 1286    db_abs_files(Graph, Snapshot, _Journal),
 1287    exists_file(Snapshot).
 rdf_db_to_file(+DB, -File) is det
rdf_db_to_file(-DB, +File) is det
Translate between database encoding (often an file or URL) and the name we store in the directory. We keep a cache for two reasons. Speed, but much more important is that the mapping of raw --> encoded provided by www_form_encode/2 is not guaranteed to be unique by the W3C standards.
 1299rdf_db_to_file(DB, File) :-
 1300    file_base_db(File, DB),
 1301    !.
 1302rdf_db_to_file(DB, File) :-
 1303    url_to_filename(DB, File),
 1304    assert(file_base_db(File, DB)).
 url_to_filename(+URL, -FileName) is det
url_to_filename(-URL, +FileName) is det
Turn a valid URL into a filename. Earlier versions used www_form_encode/2, but this can produce characters that are not valid in filenames. We will use the same encoding as www_form_encode/2, but using our own rules for allowed characters. The only requirement is that we avoid any filename special character in use. The current encoding use US-ASCII alnum characters, _ and %
 1317url_to_filename(URL, FileName) :-
 1318    atomic(URL),
 1319    !,
 1320    atom_codes(URL, Codes),
 1321    phrase(url_encode(EncCodes), Codes),
 1322    atom_codes(FileName, EncCodes).
 1323url_to_filename(URL, FileName) :-
 1324    uri_encoded(path, URL, FileName).
 1325
 1326url_encode([0'+|T]) -->
 1327    " ",
 1328    !,
 1329    url_encode(T).
 1330url_encode([C|T]) -->
 1331    alphanum(C),
 1332    !,
 1333    url_encode(T).
 1334url_encode([C|T]) -->
 1335    no_enc_extra(C),
 1336    !,
 1337    url_encode(T).
 1338url_encode(Enc) -->
 1339    (   "\r\n"
 1340    ;   "\n"
 1341    ),
 1342    !,
 1343    { string_codes("%0D%0A", Codes),
 1344      append(Codes, T, Enc)
 1345    },
 1346    url_encode(T).
 1347url_encode([]) -->
 1348    eos,
 1349    !.
 1350url_encode([0'%,D1,D2|T]) -->
 1351    [C],
 1352    { Dv1 is (C>>4 /\ 0xf),
 1353      Dv2 is (C /\ 0xf),
 1354      code_type(D1, xdigit(Dv1)),
 1355      code_type(D2, xdigit(Dv2))
 1356    },
 1357    url_encode(T).
 1358
 1359eos([], []).
 1360
 1361alphanum(C) -->
 1362    [C],
 1363    { C < 128,                      % US-ASCII
 1364      code_type(C, alnum)
 1365    }.
 1366
 1367no_enc_extra(0'_) --> "_".
 1368
 1369
 1370                 /*******************************
 1371                 *             REINDEX          *
 1372                 *******************************/
 reindex_db(+Dir, +Levels)
Reindex the database by creating intermediate directories.
 1378reindex_db(Dir, Levels) :-
 1379    directory_files(Dir, Files),
 1380    reindex_files(Files, Dir, '.', 0, Levels),
 1381    remove_empty_directories(Files, Dir).
 1382
 1383reindex_files([], _, _, _, _).
 1384reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
 1385    nofollow(Nofollow),
 1386    !,
 1387    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1388reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
 1389    CLevel \== Levels,
 1390    file_name_extension(_Base, Ext, File),
 1391    db_extension(Ext),
 1392    !,
 1393    directory_file_path(Prefix, File, DBFile),
 1394    directory_file_path(Dir, DBFile, OldPath),
 1395    db_file(Dir, File, Levels, NewPath),
 1396    debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
 1397    file_directory_name(NewPath, NewDir),
 1398    make_directory_path(NewDir),
 1399    rename_file(OldPath, NewPath),
 1400    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1401reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
 1402    directory_file_path(Prefix, D, SubD),
 1403    directory_file_path(Dir, SubD, AbsD),
 1404    exists_directory(AbsD),
 1405    \+ read_link(AbsD, _, _),      % Do not follow links
 1406    !,
 1407    directory_files(AbsD, SubFiles),
 1408    CLevel2 is CLevel + 1,
 1409    reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
 1410    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1411reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
 1412    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1413
 1414
 1415remove_empty_directories([], _).
 1416remove_empty_directories([File|Files], Dir) :-
 1417    \+ nofollow(File),
 1418    directory_file_path(Dir, File, Path),
 1419    exists_directory(Path),
 1420    \+ read_link(Path, _, _),
 1421    !,
 1422    directory_files(Path, Content),
 1423    exclude(nofollow, Content, RealContent),
 1424    (   RealContent == []
 1425    ->  debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
 1426        delete_directory(Path)
 1427    ;   remove_empty_directories(RealContent, Path)
 1428    ),
 1429    remove_empty_directories(Files, Dir).
 1430remove_empty_directories([_|Files], Dir) :-
 1431    remove_empty_directories(Files, Dir).
 1432
 1433
 1434                 /*******************************
 1435                 *            PREFIXES          *
 1436                 *******************************/
 1437
 1438save_prefixes(Dir) :-
 1439    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1440    setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
 1441                       write_prefixes(Out),
 1442                       close(Out)).
 1443
 1444write_prefixes(Out) :-
 1445    format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
 1446    forall(rdf_current_ns(Alias, URI),
 1447           format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
 load_prefixes(+RDFDBDir) is det
If the file RDFDBDir/prefixes.db exists, load the prefixes. The prefixes are registered using rdf_register_ns/3. Possible errors because the prefix definitions have changed are printed as warnings, retaining the old definition. Note that changing prefixes generally requires reloading all RDF from the source.
 1457load_prefixes(Dir) :-
 1458    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1459    (   exists_file(PrefixFile)
 1460    ->  setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
 1461                           read_prefixes(In),
 1462                           close(In))
 1463    ;   true
 1464    ).
 1465
 1466read_prefixes(Stream) :-
 1467    read_term(Stream, T0, []),
 1468    read_prefixes(T0, Stream).
 1469
 1470read_prefixes(end_of_file, _) :- !.
 1471read_prefixes(prefix(Alias, URI), Stream) :-
 1472    !,
 1473    must_be(atom, Alias),
 1474    must_be(atom, URI),
 1475    catch(rdf_register_ns(Alias, URI, []), E,
 1476          print_message(warning, E)),
 1477    read_term(Stream, T, []),
 1478    read_prefixes(T, Stream).
 1479read_prefixes(Term, _) :-
 1480    domain_error(prefix_term, Term).
 1481
 1482
 1483                 /*******************************
 1484                 *              UTIL            *
 1485                 *******************************/
 mkdir(+Directory)
Create a directory if it does not already exist.
 1491mkdir(Directory) :-
 1492    exists_directory(Directory),
 1493    !.
 1494mkdir(Directory) :-
 1495    make_directory(Directory).
 time_stamp(-Integer)
Return time-stamp rounded to integer.
 1501time_stamp(Int) :-
 1502    get_time(Now),
 1503    Int is round(Now).
 1504
 1505
 1506                 /*******************************
 1507                 *            MESSAGES          *
 1508                 *******************************/
 1509
 1510:- multifile
 1511    prolog:message/3,
 1512    prolog:message_context/3. 1513
 1514prolog:message(rdf(Term)) -->
 1515    message(Term).
 1516
 1517message(restoring(Type, Count, Jobs)) -->
 1518    [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
 1519message(restore(attached(Graphs, Triples, Time/Wall))) -->
 1520    { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
 1521    [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
 1522      [Graphs, Triples, Wall, Percent, Time] ].
 1523% attach_graph/2
 1524message(restore(true, Action)) -->
 1525    !,
 1526    silent_message(Action).
 1527message(restore(brief, Action)) -->
 1528    !,
 1529    brief_message(Action).
 1530message(restore(_, Graph)) -->
 1531    [ 'Restoring ~p ... '-[Graph], flush ].
 1532message(restore(_, snapshot(_))) -->
 1533    [ at_same_line, '(snapshot) '-[], flush ].
 1534message(restore(_, journal(_))) -->
 1535    [ at_same_line, '(journal) '-[], flush ].
 1536message(restore(_, done(_, Time, Count))) -->
 1537    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1538% load_source/4
 1539message(restore(_, snapshot(G, _))) -->
 1540    [ 'Restoring ~p\t(snapshot)'-[G], flush ].
 1541message(restore(_, journal(G, _))) -->
 1542    [ 'Restoring ~p\t(journal)'-[G], flush ].
 1543message(restore(_, done(_, Time, Count))) -->
 1544    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1545% journal handling
 1546message(update_failed(S,P,O,Action)) -->
 1547    [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
 1548% directory reindexing
 1549message(reindex(Count, Depth)) -->
 1550    [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
 1551message(reindex(Depth)) -->
 1552    [ 'Fixing database directory structure (~d levels)'-[Depth] ].
 1553message(read_only) -->
 1554    [ 'Cannot write persistent store; continuing in read-only mode.', nl,
 1555      'All changes to the RDF store will be lost if this process terminates.'
 1556    ].
 1557
 1558silent_message(_Action) --> [].
 1559
 1560brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
 1561    { file_base_name(Graph, Base) },
 1562    [ at_same_line,
 1563      '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
 1564      flush
 1565    ].
 1566brief_message(_) --> [].
 1567
 1568
 1569prolog:message_context(rdf_locked(Args)) -->
 1570    { memberchk(time(Time), Args),
 1571      memberchk(pid(Pid), Args),
 1572      format_time(string(S), '%+', Time)
 1573    },
 1574    [ nl,
 1575      'locked at ~s by process id ~w'-[S,Pid]
 1576    ]