View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2006-2015, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(rdf_persistency,
   37          [ rdf_attach_db/2,            % +Directory, +Options
   38            rdf_detach_db/0,            % +Detach current Graph
   39            rdf_current_db/1,           % -Directory
   40            rdf_persistency/2,          % +Graph, +Bool
   41            rdf_flush_journals/1,       % +Options
   42            rdf_persistency_property/1, % ?Property
   43            rdf_journal_file/2,         % ?Graph, ?JournalFile
   44            rdf_snapshot_file/2,        % ?Graph, ?SnapshotFile
   45            rdf_db_to_file/2            % ?Graph, ?FileBase
   46          ]).   47:- use_module(library(semweb/rdf_db),
   48              [ rdf_graph/1, rdf_unload_graph/1, rdf_statistics/1,
   49                rdf_load_db/1, rdf_retractall/4, rdf_create_graph/1,
   50                rdf_assert/4, rdf_update/5, rdf_monitor/2, rdf/4,
   51                rdf_save_db/2, rdf_atom_md5/3, rdf_current_ns/2,
   52                rdf_register_ns/3
   53              ]).   54
   55:- autoload(library(apply),[maplist/2,maplist/3,partition/4,exclude/3]).   56:- use_module(library(debug),[debug/3]).   57:- autoload(library(error),
   58	    [permission_error/3,must_be/2,domain_error/2]).   59:- autoload(library(filesex),
   60	    [directory_file_path/3,make_directory_path/1]).   61:- autoload(library(lists),[select/3,append/3]).   62:- autoload(library(option),[option/2,option/3]).   63:- autoload(library(readutil),[read_file_to_terms/3]).   64:- autoload(library(socket),[gethostname/1]).   65:- autoload(library(thread),[concurrent/3]).   66:- autoload(library(uri),[uri_encoded/3]).   67
   68/** <module> RDF persistency plugin
   69
   70This  module  provides  persistency   for    rdf_db.pl   based   on  the
   71rdf_monitor/2 predicate to  track  changes   to  the  repository.  Where
   72previous  versions  used  autosafe  of  the  whole  database  using  the
   73quick-load format of rdf_db, this version is  based on a quick-load file
   74per source (4th argument of rdf/4), and journalling for edit operations.
   75
   76The result is safe, avoids frequent small   changes to large files which
   77makes synchronisation and backup expensive and avoids long disruption of
   78the server doing the autosafe. Only loading large files disrupts service
   79for some time.
   80
   81The persistent backup of the database is  realised in a directory, using
   82a lock file to avoid corruption due to concurrent access. Each source is
   83represented by two files, the latest snapshot   and a journal. The state
   84is restored by loading  the  snapshot   and  replaying  the journal. The
   85predicate rdf_flush_journals/1 can be used to create fresh snapshots and
   86delete the journals.
   87
   88@tbd If there is a complete `.new'   snapshot  and no journal, we should
   89     move the .new to the plain snapshot name as a means of recovery.
   90
   91@tbd Backup of each graph using one or two files is very costly if there
   92     are many graphs.  Although the currently used subdirectories avoid
   93     hitting OS limits early, this is still not ideal. Probably we
   94     should collect (small, older?) files and combine them into a single
   95     quick load file.  We could call this (similar to GIT) a `pack'.
   96
   97@see    rdf_edit.pl
   98*/
   99
  100:- volatile
  101    rdf_directory/1,
  102    rdf_lock/2,
  103    rdf_option/1,
  104    source_journal_fd/2,
  105    file_base_db/2.  106:- dynamic
  107    rdf_directory/1,                % Absolute path
  108    rdf_lock/2,                     % Dir, Lock
  109    rdf_option/1,                   % Defined options
  110    source_journal_fd/2,            % DB, JournalFD
  111    file_base_db/2.                 % FileBase, DB
  112
  113:- meta_predicate
  114    no_agc(0).  115
  116:- predicate_options(rdf_attach_db/2, 2,
  117                     [ access(oneof([read_write,read_only])),
  118                       concurrency(positive_integer),
  119                       max_open_journals(positive_integer),
  120                       silent(oneof([true,false,brief])),
  121                       log_nested_transactions(boolean)
  122                     ]).  123
  124%!  rdf_attach_db(+Directory, +Options) is det.
  125%
  126%   Start persistent operations using Directory   as  place to store
  127%   files.   There are several cases:
  128%
  129%           * Empty DB, existing directory
  130%           Load the DB from the existing directory
  131%
  132%           * Full DB, empty directory
  133%           Create snapshots for all sources in directory
  134%
  135%   Options:
  136%
  137%           * access(+AccessMode)
  138%           One of =auto= (default), =read_write= or
  139%           =read_only=. Read-only access implies that the RDF
  140%           store is not locked. It is read at startup and all
  141%           modifications to the data are temporary. The default
  142%           =auto= mode is =read_write= if the directory is
  143%           writeable and the lock can be acquired.  Otherwise
  144%           it reverts to =read_only=.
  145%
  146%           * concurrency(+Jobs)
  147%           Number of threads to use for loading the initial
  148%           database.  If not provided it is the number of CPUs
  149%           as optained from the flag =cpu_count=.
  150%
  151%           * max_open_journals(+Count)
  152%           Maximum number of journals kept open.  If not provided,
  153%           the default is 10.  See limit_fd_pool/0.
  154%
  155%           * directory_levels(+Count)
  156%           Number of levels of intermediate directories for storing
  157%           the graph files.  Default is 2.
  158%
  159%           * silent(+BoolOrBrief)
  160%           If =true= (default =false=), do not print informational
  161%           messages.  Finally, if =brief= it will show minimal
  162%           feedback.
  163%
  164%           * log_nested_transactions(+Boolean)
  165%           If =true=, nested _log_ transactions are added to the
  166%           journal information.  By default (=false=), no log-term
  167%           is added for nested transactions.\\
  168%
  169%   @error existence_error(source_sink, Directory)
  170%   @error permission_error(write, directory, Directory)
  171
  172rdf_attach_db(DirSpec, Options) :-
  173    option(access(read_only), Options),
  174    !,
  175    absolute_file_name(DirSpec,
  176                       Directory,
  177                       [ access(read),
  178                         file_type(directory)
  179                       ]),
  180    rdf_attach_db_ro(Directory, Options).
  181rdf_attach_db(DirSpec, Options) :-
  182    option(access(read_write), Options),
  183    !,
  184    rdf_attach_db_rw(DirSpec, Options).
  185rdf_attach_db(DirSpec, Options) :-
  186    absolute_file_name(DirSpec,
  187                       Directory,
  188                       [ access(exist),
  189                         file_type(directory),
  190                         file_errors(fail)
  191                       ]),
  192    !,
  193    (   access_file(Directory, write)
  194    ->  catch(rdf_attach_db_rw(Directory, Options), E, true),
  195        (   var(E)
  196        ->  true
  197        ;   E = error(permission_error(lock, rdf_db, _), _)
  198        ->  print_message(warning, E),
  199            print_message(warning, rdf(read_only)),
  200            rdf_attach_db(DirSpec, [access(read_only)|Options])
  201        ;   throw(E)
  202        )
  203    ;   print_message(warning,
  204                      error(permission_error(write, directory, Directory))),
  205        print_message(warning, rdf(read_only)),
  206        rdf_attach_db_ro(Directory, Options)
  207    ).
  208rdf_attach_db(DirSpec, Options) :-
  209    catch(rdf_attach_db_rw(DirSpec, Options), E, true),
  210    (   var(E)
  211    ->  true
  212    ;   print_message(warning, E),
  213        print_message(warning, rdf(read_only)),
  214        rdf_attach_db(DirSpec, [access(read_only)|Options])
  215    ).
  216
  217
  218rdf_attach_db_rw(DirSpec, Options) :-
  219    absolute_file_name(DirSpec,
  220                       Directory,
  221                       [ access(write),
  222                         file_type(directory),
  223                         file_errors(fail)
  224                       ]),
  225    !,
  226    (   rdf_directory(Directory)
  227    ->  true                        % update settings?
  228    ;   rdf_detach_db,
  229        mkdir(Directory),
  230        lock_db(Directory),
  231        assert(rdf_directory(Directory)),
  232        assert_options(Options),
  233        stop_monitor,               % make sure not to register load
  234        no_agc(load_db),
  235        at_halt(rdf_detach_db),
  236        start_monitor
  237    ).
  238rdf_attach_db_rw(DirSpec, Options) :-
  239    absolute_file_name(DirSpec,
  240                       Directory,
  241                       [ solutions(all)
  242                       ]),
  243    (   exists_directory(Directory)
  244    ->  access_file(Directory, write)
  245    ;   catch(make_directory(Directory), _, fail)
  246    ),
  247    !,
  248    rdf_attach_db(Directory, Options).
  249rdf_attach_db_rw(DirSpec, _) :-         % Generate an existence or
  250    absolute_file_name(DirSpec,     % permission error
  251                       Directory,
  252                       [ access(exist),
  253                         file_type(directory)
  254                       ]),
  255    permission_error(write, directory, Directory).
  256
  257%!  rdf_attach_db_ro(+Directory, +Options)
  258%
  259%   Open an RDF database in read-only mode.
  260
  261rdf_attach_db_ro(Directory, Options) :-
  262    rdf_detach_db,
  263    assert(rdf_directory(Directory)),
  264    assert_options(Options),
  265    stop_monitor,           % make sure not to register load
  266    no_agc(load_db).
  267
  268
  269assert_options([]).
  270assert_options([H|T]) :-
  271    (   option_type(H, Check)
  272    ->  Check,
  273        assert(rdf_option(H))
  274    ;   true                        % ignore options we do not understand
  275    ),
  276    assert_options(T).
  277
  278option_type(concurrency(X),             must_be(positive_integer, X)).
  279option_type(max_open_journals(X),       must_be(positive_integer, X)).
  280option_type(directory_levels(X),        must_be(positive_integer, X)).
  281option_type(silent(X),                  must_be(oneof([true,false,brief]), X)).
  282option_type(log_nested_transactions(X), must_be(boolean, X)).
  283option_type(access(X),                  must_be(oneof([read_write,
  284                                                       read_only]), X)).
  285
  286
  287%!  rdf_persistency_property(?Property) is nondet.
  288%
  289%   True when Property is a property of the current persistent database.
  290%   Exposes  the  properties  that  can   be    passed   as  options  to
  291%   rdf_attach_db/2.                                       Specifically,
  292%   rdf_persistency_property(access(read_only)) is true iff the database
  293%   is mounted in read-only mode. In addition, the following property is
  294%   supported:
  295%
  296%     - directory(Dir)
  297%     The directory in which the database resides.
  298
  299rdf_persistency_property(Property) :-
  300    var(Property),
  301    !,
  302    rdf_persistency_property_(Property).
  303rdf_persistency_property(Property) :-
  304    rdf_persistency_property_(Property),
  305    !.
  306
  307rdf_persistency_property_(Property) :-
  308    rdf_option(Property).
  309rdf_persistency_property_(directory(Dir)) :-
  310    rdf_directory(Dir).
  311
  312%!  no_agc(:Goal)
  313%
  314%   Run Goal with atom garbage collection   disabled. Loading an RDF
  315%   database creates large amounts  of  atoms   we  *know*  are  not
  316%   garbage.
  317
  318no_agc(Goal) :-
  319    current_prolog_flag(agc_margin, Old),
  320    setup_call_cleanup(
  321        set_prolog_flag(agc_margin, 0),
  322        Goal,
  323        set_prolog_flag(agc_margin, Old)).
  324
  325
  326%!  rdf_detach_db is det.
  327%
  328%   Detach from the  current  database.   Succeeds  silently  if  no
  329%   database is attached. Normally called at  the end of the program
  330%   through at_halt/1.
  331
  332rdf_detach_db :-
  333    debug(halt, 'Detaching RDF database', []),
  334    stop_monitor,
  335    close_journals,
  336    (   retract(rdf_directory(Dir))
  337    ->  debug(halt, 'DB Directory: ~w', [Dir]),
  338        save_prefixes(Dir),
  339        retractall(rdf_option(_)),
  340        retractall(source_journal_fd(_,_)),
  341        unlock_db(Dir)
  342    ;   true
  343    ).
  344
  345
  346%!  rdf_current_db(?Dir)
  347%
  348%   True if Dir is the current RDF persistent database.
  349
  350rdf_current_db(Directory) :-
  351    rdf_directory(Dir),
  352    !,
  353    Dir = Directory.
  354
  355
  356%!  rdf_flush_journals(+Options)
  357%
  358%   Flush dirty journals.  Options:
  359%
  360%           * min_size(+KB)
  361%           Only flush if journal is over KB in size.
  362%           * graph(+Graph)
  363%           Only flush the journal of Graph
  364%
  365%   @tbd Provide a default for min_size?
  366
  367rdf_flush_journals(Options) :-
  368    option(graph(Graph), Options, _),
  369    forall(rdf_graph(Graph),
  370           rdf_flush_journal(Graph, Options)).
  371
  372rdf_flush_journal(Graph, Options) :-
  373    db_files(Graph, _SnapshotFile, JournalFile),
  374    db_file(JournalFile, File),
  375    (   \+ exists_file(File)
  376    ->  true
  377    ;   memberchk(min_size(KB), Options),
  378        size_file(File, Size),
  379        Size / 1024 < KB
  380    ->  true
  381    ;   create_db(Graph)
  382    ).
  383
  384                 /*******************************
  385                 *             LOAD             *
  386                 *******************************/
  387
  388%!  load_db is det.
  389%
  390%   Reload database from the directory specified by rdf_directory/1.
  391%   First we find all names graphs using find_dbs/1 and then we load
  392%   them.
  393
  394load_db :-
  395    rdf_directory(Dir),
  396    concurrency(Jobs),
  397    cpu_stat_key(Jobs, StatKey),
  398    get_time(Wall0),
  399    statistics(StatKey, T0),
  400    load_prefixes(Dir),
  401    verbosity(Silent),
  402    find_dbs(Dir, Graphs, SnapShots, Journals),
  403    length(Graphs, GraphCount),
  404    maplist(rdf_unload_graph, Graphs),
  405    rdf_statistics(triples(Triples0)),
  406    load_sources(snapshots, SnapShots, Silent, Jobs),
  407    load_sources(journals, Journals, Silent, Jobs),
  408    rdf_statistics(triples(Triples1)),
  409    statistics(StatKey, T1),
  410    get_time(Wall1),
  411    T is T1 - T0,
  412    Wall is Wall1 - Wall0,
  413    Triples = Triples1 - Triples0,
  414    message_level(Silent, Level),
  415    print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
  416
  417load_sources(_, [], _, _) :- !.
  418load_sources(Type, Sources, Silent, Jobs) :-
  419    length(Sources, Count),
  420    RunJobs is min(Count, Jobs),
  421    print_message(informational, rdf(restoring(Type, Count, RunJobs))),
  422    make_goals(Sources, Silent, 1, Count, Goals),
  423    concurrent(RunJobs, Goals, []).
  424
  425
  426%!  make_goals(+DBs, +Silent, +Index, +Total, -Goals)
  427
  428make_goals([], _, _, _, []).
  429make_goals([DB|T0], Silent, I,  Total,
  430           [load_source(DB, Silent, I, Total)|T]) :-
  431    I2 is I + 1,
  432    make_goals(T0, Silent, I2, Total, T).
  433
  434verbosity(Silent) :-
  435    rdf_option(silent(Silent)),
  436    !.
  437verbosity(Silent) :-
  438    current_prolog_flag(verbose, silent),
  439    !,
  440    Silent = true.
  441verbosity(brief).
  442
  443
  444%!  concurrency(-Jobs)
  445%
  446%   Number of jobs to run concurrently.
  447
  448concurrency(Jobs) :-
  449    rdf_option(concurrency(Jobs)),
  450    !.
  451concurrency(Jobs) :-
  452    current_prolog_flag(cpu_count, Jobs),
  453    Jobs > 0,
  454    !.
  455concurrency(1).
  456
  457cpu_stat_key(1, cputime) :- !.
  458cpu_stat_key(_, process_cputime).
  459
  460
  461%!  find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det.
  462%
  463%   Scan the persistent database and return a list of snapshots and
  464%   journals, both sorted by file-size.  Each term is of the form
  465%
  466%     ==
  467%     db(Size, Ext, DB, DBFile, Depth)
  468%     ==
  469
  470find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
  471    directory_files(Dir, Files),
  472    phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
  473    maplist(db_graph, Scanned, UnsortedGraphs),
  474    sort(UnsortedGraphs, Graphs),
  475    (   consider_reindex_db(Dir, Graphs, Scanned)
  476    ->  find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
  477    ;   partition(db_is_snapshot, Scanned, Snapshots, Journals),
  478        sort(Snapshots, SnapBySize),
  479        sort(Journals, JournalBySize)
  480    ).
  481
  482consider_reindex_db(Dir, Graphs, Scanned) :-
  483    length(Graphs, Count),
  484    Count > 0,
  485    DepthNeeded is floor(log(Count)/log(256)),
  486    (   maplist(depth_db(DepthNow), Scanned)
  487    ->  (   DepthNeeded > DepthNow
  488        ->  true
  489        ;   retractall(rdf_option(directory_levels(_))),
  490            assertz(rdf_option(directory_levels(DepthNow))),
  491            fail
  492        )
  493    ;   true
  494    ),
  495    reindex_db(Dir, DepthNeeded).
  496
  497db_is_snapshot(Term) :-
  498    arg(2, Term, trp).
  499
  500db_graph(Term, DB) :-
  501    arg(3, Term, DB).
  502
  503db_file_name(Term, File) :-
  504    arg(4, Term, File).
  505
  506depth_db(Depth, DB) :-
  507    arg(5, DB, Depth).
  508
  509%!  scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det.
  510%
  511%   Produces a list of db(DB,  Size,   File)  for all recognised RDF
  512%   database files.  File is relative to the database directory Dir.
  513
  514scan_db_files([], _, _, _) -->
  515    [].
  516scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
  517    { nofollow(Nofollow) },
  518    !,
  519    scan_db_files(T, Dir, Prefix, Depth).
  520scan_db_files([File|T], Dir, Prefix, Depth) -->
  521    { file_name_extension(Base, Ext, File),
  522      db_extension(Ext),
  523      !,
  524      rdf_db_to_file(DB, Base),
  525      directory_file_path(Prefix, File, DBFile),
  526      directory_file_path(Dir, DBFile, AbsFile),
  527      size_file(AbsFile, Size)
  528    },
  529    [ db(Size, Ext, DB, AbsFile, Depth) ],
  530    scan_db_files(T, Dir, Prefix, Depth).
  531scan_db_files([D|T], Dir, Prefix, Depth) -->
  532    { directory_file_path(Prefix, D, SubD),
  533      directory_file_path(Dir, SubD, AbsD),
  534      exists_directory(AbsD),
  535      \+ read_link(AbsD, _, _),    % Do not follow links
  536      !,
  537      directory_files(AbsD, SubFiles),
  538      SubDepth is Depth + 1
  539    },
  540    scan_db_files(SubFiles, Dir, SubD, SubDepth),
  541    scan_db_files(T, Dir, Prefix, Depth).
  542scan_db_files([_|T], Dir, Prefix, Depth) -->
  543    scan_db_files(T, Dir, Prefix, Depth).
  544
  545nofollow(.).
  546nofollow(..).
  547
  548db_extension(trp).
  549db_extension(jrn).
  550
  551:- public load_source/4.                % called through make_goals/5
  552
  553load_source(DB, Silent, Nth, Total) :-
  554    db_file_name(DB, File),
  555    db_graph(DB, Graph),
  556    message_level(Silent, Level),
  557    graph_triple_count(Graph, Count0),
  558    statistics(cputime, T0),
  559    (   db_is_snapshot(DB)
  560    ->  print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
  561        rdf_load_db(File)
  562    ;   print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
  563        load_journal(File, Graph)
  564    ),
  565    statistics(cputime, T1),
  566    T is T1 - T0,
  567    graph_triple_count(Graph, Count1),
  568    Count is Count1 - Count0,
  569    print_message(Level, rdf(restore(Silent,
  570                                     done(Graph, T, Count, Nth, Total)))).
  571
  572
  573graph_triple_count(Graph, Count) :-
  574    rdf_statistics(triples_by_graph(Graph, Count)),
  575    !.
  576graph_triple_count(_, 0).
  577
  578
  579%!  attach_graph(+Graph, +Options) is det.
  580%
  581%   Load triples and reload  journal   from  the  indicated snapshot
  582%   file.
  583
  584attach_graph(Graph, Options) :-
  585    (   option(silent(true), Options)
  586    ->  Level = silent
  587    ;   Level = informational
  588    ),
  589    db_files(Graph, SnapshotFile, JournalFile),
  590    rdf_retractall(_,_,_,Graph),
  591    statistics(cputime, T0),
  592    print_message(Level, rdf(restore(Silent, Graph))),
  593    db_file(SnapshotFile, AbsSnapShot),
  594    (   exists_file(AbsSnapShot)
  595    ->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
  596        rdf_load_db(AbsSnapShot)
  597    ;   true
  598    ),
  599    (   exists_db(JournalFile)
  600    ->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
  601        load_journal(JournalFile, Graph)
  602    ;   true
  603    ),
  604    statistics(cputime, T1),
  605    T is T1 - T0,
  606    (   rdf_statistics(triples_by_graph(Graph, Count))
  607    ->  true
  608    ;   Count = 0
  609    ),
  610    print_message(Level, rdf(restore(Silent,
  611                                     done(Graph, T, Count)))).
  612
  613message_level(true, silent) :- !.
  614message_level(_, informational).
  615
  616
  617                 /*******************************
  618                 *         LOAD JOURNAL         *
  619                 *******************************/
  620
  621%!  load_journal(+File:atom, +DB:atom) is det.
  622%
  623%   Process transactions from the RDF journal File, adding the given
  624%   named graph.
  625
  626load_journal(File, DB) :-
  627    rdf_create_graph(DB),
  628    setup_call_cleanup(
  629        open(File, read, In, [encoding(utf8)]),
  630        ( read(In, T0),
  631          process_journal(T0, In, DB)
  632        ),
  633        close(In)).
  634
  635process_journal(end_of_file, _, _) :- !.
  636process_journal(Term, In, DB) :-
  637    (   process_journal_term(Term, DB)
  638    ->  true
  639    ;   throw(error(type_error(journal_term, Term), _))
  640    ),
  641    read(In, T2),
  642    process_journal(T2, In, DB).
  643
  644process_journal_term(assert(S,P,O), DB) :-
  645    rdf_assert(S,P,O,DB).
  646process_journal_term(assert(S,P,O,Line), DB) :-
  647    rdf_assert(S,P,O,DB:Line).
  648process_journal_term(retract(S,P,O), DB) :-
  649    rdf_retractall(S,P,O,DB).
  650process_journal_term(retract(S,P,O,Line), DB) :-
  651    rdf_retractall(S,P,O,DB:Line).
  652process_journal_term(update(S,P,O,Action), DB) :-
  653    (   rdf_update(S,P,O,DB, Action)
  654    ->  true
  655    ;   print_message(warning, rdf(update_failed(S,P,O,Action)))
  656    ).
  657process_journal_term(start(_), _).      % journal open/close
  658process_journal_term(end(_), _).
  659process_journal_term(begin(_), _).      % logged transaction (compatibility)
  660process_journal_term(end, _).
  661process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
  662process_journal_term(end(_,_,_), _).
  663
  664
  665                 /*******************************
  666                 *         CREATE JOURNAL       *
  667                 *******************************/
  668
  669:- dynamic
  670    blocked_db/2,                   % DB, Reason
  671    transaction_message/3,          % Nesting, Time, Message
  672    transaction_db/3.               % Nesting, DB, Id
  673
  674%!  rdf_persistency(+DB, Bool)
  675%
  676%   Specify whether a database is persistent.  Switching to =false=
  677%   kills the persistent state.  Switching to =true= creates it.
  678
  679rdf_persistency(DB, Bool) :-
  680    must_be(atom, DB),
  681    must_be(boolean, Bool),
  682    fail.
  683rdf_persistency(DB, false) :-
  684    !,
  685    (   blocked_db(DB, persistency)
  686    ->  true
  687    ;   assert(blocked_db(DB, persistency)),
  688        delete_db(DB)
  689    ).
  690rdf_persistency(DB, true) :-
  691    (   retract(blocked_db(DB, persistency))
  692    ->  create_db(DB)
  693    ;   true
  694    ).
  695
  696%!  rdf_db:property_of_graph(?Property, +Graph) is nondet.
  697%
  698%   Extend rdf_graph_property/2 with new properties.
  699
  700:- multifile
  701    rdf_db:property_of_graph/2.  702
  703rdf_db:property_of_graph(persistent(State), Graph) :-
  704    (   blocked_db(Graph, persistency)
  705    ->  State = false
  706    ;   State = true
  707    ).
  708
  709
  710%!  start_monitor is det.
  711%!  stop_monitor is det.
  712%
  713%   Start/stop monitoring the RDF database   for  changes and update
  714%   the journal.
  715
  716start_monitor :-
  717    rdf_monitor(monitor,
  718                [ -assert(load)
  719                ]).
  720stop_monitor :-
  721    rdf_monitor(monitor,
  722                [ -all
  723                ]).
  724
  725%!  monitor(+Term) is semidet.
  726%
  727%   Handle an rdf_monitor/2 callback to  deal with persistency. Note
  728%   that the monitor calls that come   from rdf_db.pl that deal with
  729%   database changes are serialized.  They   do  come from different
  730%   threads though.
  731
  732monitor(Msg) :-
  733    debug(monitor, 'Monitor: ~p~n', [Msg]),
  734    fail.
  735monitor(assert(S,P,O,DB:Line)) :-
  736    !,
  737    \+ blocked_db(DB, _),
  738    journal_fd(DB, Fd),
  739    open_transaction(DB, Fd),
  740    format(Fd, '~q.~n', [assert(S,P,O,Line)]),
  741    sync_journal(DB, Fd).
  742monitor(assert(S,P,O,DB)) :-
  743    \+ blocked_db(DB, _),
  744    journal_fd(DB, Fd),
  745    open_transaction(DB, Fd),
  746    format(Fd, '~q.~n', [assert(S,P,O)]),
  747    sync_journal(DB, Fd).
  748monitor(retract(S,P,O,DB:Line)) :-
  749    !,
  750    \+ blocked_db(DB, _),
  751    journal_fd(DB, Fd),
  752    open_transaction(DB, Fd),
  753    format(Fd, '~q.~n', [retract(S,P,O,Line)]),
  754    sync_journal(DB, Fd).
  755monitor(retract(S,P,O,DB)) :-
  756    \+ blocked_db(DB, _),
  757    journal_fd(DB, Fd),
  758    open_transaction(DB, Fd),
  759    format(Fd, '~q.~n', [retract(S,P,O)]),
  760    sync_journal(DB, Fd).
  761monitor(update(S,P,O,DB:Line,Action)) :-
  762    !,
  763    \+ blocked_db(DB, _),
  764    (   Action = graph(NewDB)
  765    ->  monitor(assert(S,P,O,NewDB)),
  766        monitor(retract(S,P,O,DB:Line))
  767    ;   journal_fd(DB, Fd),
  768        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  769        sync_journal(DB, Fd)
  770    ).
  771monitor(update(S,P,O,DB,Action)) :-
  772    \+ blocked_db(DB, _),
  773    (   Action = graph(NewDB)
  774    ->  monitor(assert(S,P,O,NewDB)),
  775        monitor(retract(S,P,O,DB))
  776    ;   journal_fd(DB, Fd),
  777        open_transaction(DB, Fd),
  778        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  779        sync_journal(DB, Fd)
  780    ).
  781monitor(load(BE, _DumpFileURI)) :-
  782    (   BE = end(Graphs)
  783    ->  sync_loaded_graphs(Graphs)
  784    ;   true
  785    ).
  786monitor(create_graph(Graph)) :-
  787    \+ blocked_db(Graph, _),
  788    journal_fd(Graph, Fd),
  789    open_transaction(Graph, Fd),
  790    sync_journal(Graph, Fd).
  791monitor(reset) :-
  792    forall(rdf_graph(Graph), delete_db(Graph)).
  793                                        % TBD: Remove empty directories?
  794
  795monitor(transaction(BE, Id)) :-
  796    monitor_transaction(Id, BE).
  797
  798monitor_transaction(load_journal(DB), begin(_)) :-
  799    !,
  800    assert(blocked_db(DB, journal)).
  801monitor_transaction(load_journal(DB), end(_)) :-
  802    !,
  803    retractall(blocked_db(DB, journal)).
  804
  805monitor_transaction(parse(URI), begin(_)) :-
  806    !,
  807    (   blocked_db(URI, persistency)
  808    ->  true
  809    ;   assert(blocked_db(URI, parse))
  810    ).
  811monitor_transaction(parse(URI), end(_)) :-
  812    !,
  813    (   retract(blocked_db(URI, parse))
  814    ->  create_db(URI)
  815    ;   true
  816    ).
  817monitor_transaction(unload(DB), begin(_)) :-
  818    !,
  819    (   blocked_db(DB, persistency)
  820    ->  true
  821    ;   assert(blocked_db(DB, unload))
  822    ).
  823monitor_transaction(unload(DB), end(_)) :-
  824    !,
  825    (   retract(blocked_db(DB, unload))
  826    ->  delete_db(DB)
  827    ;   true
  828    ).
  829monitor_transaction(log(Msg), begin(N)) :-
  830    !,
  831    check_nested(N),
  832    get_time(Time),
  833    asserta(transaction_message(N, Time, Msg)).
  834monitor_transaction(log(_), end(N)) :-
  835    check_nested(N),
  836    retract(transaction_message(N, _, _)),
  837    !,
  838    findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
  839    end_transactions(DBs, N).
  840monitor_transaction(log(Msg, DB), begin(N)) :-
  841    !,
  842    check_nested(N),
  843    get_time(Time),
  844    asserta(transaction_message(N, Time, Msg)),
  845    journal_fd(DB, Fd),
  846    open_transaction(DB, Fd).
  847monitor_transaction(log(Msg, _DB), end(N)) :-
  848    monitor_transaction(log(Msg), end(N)).
  849
  850
  851%!  check_nested(+Level) is semidet.
  852%
  853%   True if we must log this transaction.   This  is always the case
  854%   for toplevel transactions. Nested transactions   are only logged
  855%   if log_nested_transactions(true) is defined.
  856
  857check_nested(0) :- !.
  858check_nested(_) :-
  859    rdf_option(log_nested_transactions(true)).
  860
  861
  862%!  open_transaction(+DB, +Fd) is det.
  863%
  864%   Add a begin(Id, Level, Time,  Message)   term  if  a transaction
  865%   involves DB. Id is an incremental   integer, where each database
  866%   has its own counter. Level is the nesting level, Time a floating
  867%   point timestamp and Message te message   provided as argument to
  868%   the log message.
  869
  870open_transaction(DB, Fd) :-
  871    transaction_message(N, Time, Msg),
  872    !,
  873    (   transaction_db(N, DB, _)
  874    ->  true
  875    ;   next_transaction_id(DB, Id),
  876        assert(transaction_db(N, DB, Id)),
  877        RoundedTime is round(Time*100)/100,
  878        format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
  879    ).
  880open_transaction(_,_).
  881
  882
  883%!  next_transaction_id(+DB, -Id) is det.
  884%
  885%   Id is the number to user for  the next logged transaction on DB.
  886%   Transactions in each  named  graph   are  numbered  in sequence.
  887%   Searching the Id of the last transaction is performed by the 2nd
  888%   clause starting 1Kb from the end   and doubling this offset each
  889%   failure.
  890
  891:- dynamic
  892    current_transaction_id/2.  893
  894next_transaction_id(DB, Id) :-
  895    retract(current_transaction_id(DB, Last)),
  896    !,
  897    Id is Last + 1,
  898    assert(current_transaction_id(DB, Id)).
  899next_transaction_id(DB, Id) :-
  900    db_files(DB, _, Journal),
  901    exists_file(Journal),
  902    !,
  903    size_file(Journal, Size),
  904    open_db(Journal, read, In, []),
  905    call_cleanup(iterative_expand(In, Size, Last), close(In)),
  906    Id is Last + 1,
  907    assert(current_transaction_id(DB, Id)).
  908next_transaction_id(DB, 1) :-
  909    assert(current_transaction_id(DB, 1)).
  910
  911iterative_expand(_, 0, 0) :- !.
  912iterative_expand(In, Size, Last) :-     % Scan growing sections from the end
  913    Max is floor(log(Size)/log(2)),
  914    between(10, Max, Step),
  915    Offset is -(1<<Step),
  916    seek(In, Offset, eof, _),
  917    skip(In, 10),                   % records are line-based
  918    read(In, T0),
  919    last_transaction_id(T0, In, 0, Last),
  920    Last > 0,
  921    !.
  922iterative_expand(In, _, Last) :-        % Scan the whole file
  923    seek(In, 0, bof, _),
  924    read(In, T0),
  925    last_transaction_id(T0, In, 0, Last).
  926
  927last_transaction_id(end_of_file, _, Last, Last) :- !.
  928last_transaction_id(end(Id, _, _), In, _, Last) :-
  929    read(In, T1),
  930    last_transaction_id(T1, In, Id, Last).
  931last_transaction_id(_, In, Id, Last) :-
  932    read(In, T1),
  933    last_transaction_id(T1, In, Id, Last).
  934
  935
  936%!  end_transactions(+DBs:list(atom:id)) is det.
  937%
  938%   End a transaction that affected the  given list of databases. We
  939%   write the list of other affected databases as an argument to the
  940%   end-term to facilitate fast finding of the related transactions.
  941%
  942%   In each database, the transaction is   ended with a term end(Id,
  943%   Nesting, Others), where  Id  and   Nesting  are  the transaction
  944%   identifier and nesting (see open_transaction/2)  and Others is a
  945%   list of DB:Id,  indicating  other   databases  affected  by  the
  946%   transaction.
  947
  948end_transactions(DBs, N) :-
  949    end_transactions(DBs, DBs, N).
  950
  951end_transactions([], _, _).
  952end_transactions([DB:Id|T], DBs, N) :-
  953    journal_fd(DB, Fd),
  954    once(select(DB:Id, DBs, Others)),
  955    format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
  956    sync_journal(DB, Fd),
  957    end_transactions(T, DBs, N).
  958
  959
  960%!  sync_loaded_graphs(+Graphs)
  961%
  962%   Called after a binary triple has been loaded that added triples
  963%   to the given graphs.
  964
  965sync_loaded_graphs(Graphs) :-
  966    maplist(create_db, Graphs).
  967
  968
  969                 /*******************************
  970                 *         JOURNAL FILES        *
  971                 *******************************/
  972
  973%!  journal_fd(+DB, -Stream) is det.
  974%
  975%   Get an open stream to a journal. If the journal is not open, old
  976%   journals are closed to satisfy   the =max_open_journals= option.
  977%   Then the journal is opened in   =append= mode. Journal files are
  978%   always encoded as UTF-8 for  portability   as  well as to ensure
  979%   full coverage of Unicode.
  980
  981journal_fd(DB, Fd) :-
  982    source_journal_fd(DB, Fd),
  983    !.
  984journal_fd(DB, Fd) :-
  985    with_mutex(rdf_journal_file,
  986               journal_fd_(DB, Out)),
  987    Fd = Out.
  988
  989journal_fd_(DB, Fd) :-
  990    source_journal_fd(DB, Fd),
  991    !.
  992journal_fd_(DB, Fd) :-
  993    limit_fd_pool,
  994    db_files(DB, _Snapshot, Journal),
  995    open_db(Journal, append, Fd,
  996            [ close_on_abort(false)
  997            ]),
  998    time_stamp(Now),
  999    format(Fd, '~q.~n', [start([time(Now)])]),
 1000    assert(source_journal_fd(DB, Fd)).              % new one at the end
 1001
 1002%!  limit_fd_pool is det.
 1003%
 1004%   Limit the number of  open   journals  to max_open_journals (10).
 1005%   Note that calls  from  rdf_monitor/2   are  issued  in different
 1006%   threads, but as they are part of write operations they are fully
 1007%   synchronised.
 1008
 1009limit_fd_pool :-
 1010    predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
 1011    !,
 1012    (   rdf_option(max_open_journals(Max))
 1013    ->  true
 1014    ;   Max = 10
 1015    ),
 1016    Close is N - Max,
 1017    forall(between(1, Close, _),
 1018           close_oldest_journal).
 1019limit_fd_pool.
 1020
 1021close_oldest_journal :-
 1022    source_journal_fd(DB, _Fd),
 1023    !,
 1024    debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
 1025    close_journal(DB).
 1026close_oldest_journal.
 1027
 1028
 1029%!  sync_journal(+DB, +Fd)
 1030%
 1031%   Sync journal represented by database and   stream.  If the DB is
 1032%   involved in a transaction there is   no point flushing until the
 1033%   end of the transaction.
 1034
 1035sync_journal(DB, _) :-
 1036    transaction_db(_, DB, _),
 1037    !.
 1038sync_journal(_, Fd) :-
 1039    flush_output(Fd).
 1040
 1041%!  close_journal(+DB) is det.
 1042%
 1043%   Close the journal associated with DB if it is open.
 1044
 1045close_journal(DB) :-
 1046    with_mutex(rdf_journal_file,
 1047               close_journal_(DB)).
 1048
 1049close_journal_(DB) :-
 1050    (   retract(source_journal_fd(DB, Fd))
 1051    ->  time_stamp(Now),
 1052        format(Fd, '~q.~n', [end([time(Now)])]),
 1053        close(Fd, [force(true)])
 1054    ;   true
 1055    ).
 1056
 1057%!  close_journals
 1058%
 1059%   Close all open journals.
 1060
 1061close_journals :-
 1062    forall(source_journal_fd(DB, _),
 1063           catch(close_journal(DB), E,
 1064                 print_message(error, E))).
 1065
 1066%!  create_db(+Graph)
 1067%
 1068%   Create a saved version of Graph in corresponding file, close and
 1069%   delete journals.
 1070
 1071create_db(Graph) :-
 1072    \+ rdf(_,_,_,Graph),
 1073    !,
 1074    debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
 1075    delete_db(Graph).
 1076create_db(Graph) :-
 1077    debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
 1078    close_journal(Graph),
 1079    db_abs_files(Graph, Snapshot, Journal),
 1080    atom_concat(Snapshot, '.new', NewSnapshot),
 1081    (   catch(( create_directory_levels(Snapshot),
 1082                rdf_save_db(NewSnapshot, Graph)
 1083              ), Error,
 1084              ( print_message(warning, Error),
 1085                fail
 1086              ))
 1087    ->  (   exists_file(Journal)
 1088        ->  delete_file(Journal)
 1089        ;   true
 1090        ),
 1091        rename_file(NewSnapshot, Snapshot),
 1092        debug(rdf_persistency, 'Saved Graph ~w', [Graph])
 1093    ;   catch(delete_file(NewSnapshot), _, true)
 1094    ).
 1095
 1096
 1097%!  delete_db(+DB)
 1098%
 1099%   Remove snapshot and journal file for DB.
 1100
 1101delete_db(DB) :-
 1102    with_mutex(rdf_journal_file,
 1103               delete_db_(DB)).
 1104
 1105delete_db_(DB) :-
 1106    close_journal_(DB),
 1107    db_abs_files(DB, Snapshot, Journal),
 1108    !,
 1109    (   exists_file(Journal)
 1110    ->  delete_file(Journal)
 1111    ;   true
 1112    ),
 1113    (   exists_file(Snapshot)
 1114    ->  delete_file(Snapshot)
 1115    ;   true
 1116    ).
 1117delete_db_(_).
 1118
 1119                 /*******************************
 1120                 *             LOCKING          *
 1121                 *******************************/
 1122
 1123%!  lock_db(+Dir)
 1124%
 1125%   Lock the database directory Dir.
 1126
 1127lock_db(Dir) :-
 1128    lockfile(Dir, File),
 1129    catch(open(File, update, Out, [lock(write), wait(false)]),
 1130          error(permission_error(Access, _, _), _),
 1131          locked_error(Access, Dir)),
 1132    (   current_prolog_flag(pid, PID)
 1133    ->  true
 1134    ;   PID = 0                     % TBD: Fix in Prolog
 1135    ),
 1136    time_stamp(Now),
 1137    gethostname(Host),
 1138    format(Out, '/* RDF Database is in use */~n~n', []),
 1139    format(Out, '~q.~n', [ locked([ time(Now),
 1140                                    pid(PID),
 1141                                    host(Host)
 1142                                  ])
 1143                         ]),
 1144    flush_output(Out),
 1145    set_end_of_stream(Out),
 1146    assert(rdf_lock(Dir, lock(Out, File))),
 1147    at_halt(unlock_db(Dir)).
 1148
 1149locked_error(lock, Dir) :-
 1150    lockfile(Dir, File),
 1151    (   catch(read_file_to_terms(File, Terms, []), _, fail),
 1152        Terms = [locked(Args)]
 1153    ->  Context = rdf_locked(Args)
 1154    ;   Context = context(_, 'Database is in use')
 1155    ),
 1156    throw(error(permission_error(lock, rdf_db, Dir), Context)).
 1157locked_error(open, Dir) :-
 1158    throw(error(permission_error(lock, rdf_db, Dir),
 1159                context(_, 'Lock file cannot be opened'))).
 1160
 1161%!  unlock_db(+Dir) is det.
 1162%!  unlock_db(+Stream, +File) is det.
 1163
 1164unlock_db(Dir) :-
 1165    retract(rdf_lock(Dir, lock(Out, File))),
 1166    !,
 1167    unlock_db(Out, File).
 1168unlock_db(_).
 1169
 1170unlock_db(Out, File) :-
 1171    close(Out),
 1172    delete_file(File).
 1173
 1174                 /*******************************
 1175                 *           FILENAMES          *
 1176                 *******************************/
 1177
 1178lockfile(Dir, LockFile) :-
 1179    atomic_list_concat([Dir, /, lock], LockFile).
 1180
 1181directory_levels(Levels) :-
 1182    rdf_option(directory_levels(Levels)),
 1183    !.
 1184directory_levels(2).
 1185
 1186db_file(Base, File) :-
 1187    rdf_directory(Dir),
 1188    directory_levels(Levels),
 1189    db_file(Dir, Base, Levels, File).
 1190
 1191db_file(Dir, Base, Levels, File) :-
 1192    dir_levels(Base, Levels, Segments, [Base]),
 1193    atomic_list_concat([Dir|Segments], /, File).
 1194
 1195open_db(Base, Mode, Stream, Options) :-
 1196    db_file(Base, File),
 1197    create_directory_levels(File),
 1198    open(File, Mode, Stream, [encoding(utf8)|Options]).
 1199
 1200create_directory_levels(_File) :-
 1201    rdf_option(directory_levels(0)),
 1202    !.
 1203create_directory_levels(File) :-
 1204    file_directory_name(File, Dir),
 1205    make_directory_path(Dir).
 1206
 1207exists_db(Base) :-
 1208    db_file(Base, File),
 1209    exists_file(File).
 1210
 1211%!  dir_levels(+File, +Levels, ?Segments, ?Tail) is det.
 1212%
 1213%   Create a list of intermediate directory names for File.  Each
 1214%   directory consists of two hexadecimal digits.
 1215
 1216dir_levels(_, 0, Segments, Segments) :- !.
 1217dir_levels(File, Levels, Segments, Tail) :-
 1218    rdf_atom_md5(File, 1, Hash),
 1219    create_dir_levels(Levels, 0, Hash, Segments, Tail).
 1220
 1221create_dir_levels(0, _, _, Segments, Segments) :- !.
 1222create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
 1223    sub_atom(Hash, S, 2, _, S1),
 1224    S2 is S+2,
 1225    N2 is N-1,
 1226    create_dir_levels(N2, S2, Hash, Segments0, Tail).
 1227
 1228
 1229%!  db_files(+DB, -Snapshot, -Journal).
 1230%!  db_files(-DB, +Snapshot, -Journal).
 1231%!  db_files(-DB, -Snapshot, +Journal).
 1232%
 1233%   True if named graph DB is represented  by the files Snapshot and
 1234%   Journal. The filenames are local   to the directory representing
 1235%   the store.
 1236
 1237db_files(DB, Snapshot, Journal) :-
 1238    nonvar(DB),
 1239    !,
 1240    rdf_db_to_file(DB, Base),
 1241    atom_concat(Base, '.trp', Snapshot),
 1242    atom_concat(Base, '.jrn', Journal).
 1243db_files(DB, Snapshot, Journal) :-
 1244    nonvar(Snapshot),
 1245    !,
 1246    atom_concat(Base, '.trp', Snapshot),
 1247    atom_concat(Base, '.jrn', Journal),
 1248    rdf_db_to_file(DB, Base).
 1249db_files(DB, Snapshot, Journal) :-
 1250    nonvar(Journal),
 1251    !,
 1252    atom_concat(Base, '.jrn', Journal),
 1253    atom_concat(Base, '.trp', Snapshot),
 1254    rdf_db_to_file(DB, Base).
 1255
 1256db_abs_files(DB, Snapshot, Journal) :-
 1257    db_files(DB, Snapshot0, Journal0),
 1258    db_file(Snapshot0, Snapshot),
 1259    db_file(Journal0, Journal).
 1260
 1261
 1262%!  rdf_journal_file(+Graph, -File) is semidet.
 1263%!  rdf_journal_file(-Graph, -File) is nondet.
 1264%
 1265%   True if File the name of the existing journal file for Graph.
 1266
 1267rdf_journal_file(Graph, Journal) :-
 1268    (   var(Graph)
 1269    ->  rdf_graph(Graph)
 1270    ;   true
 1271    ),
 1272    db_abs_files(Graph, _Snapshot, Journal),
 1273    exists_file(Journal).
 1274
 1275
 1276%!  rdf_snapshot_file(+Graph, -File) is semidet.
 1277%!  rdf_snapshot_file(-Graph, -File) is nondet.
 1278%
 1279%   True if File the name of the existing snapshot file for Graph.
 1280
 1281rdf_snapshot_file(Graph, Snapshot) :-
 1282    (   var(Graph)
 1283    ->  rdf_graph(Graph)    % also pick the empty graphs
 1284    ;   true
 1285    ),
 1286    db_abs_files(Graph, Snapshot, _Journal),
 1287    exists_file(Snapshot).
 1288
 1289
 1290%!  rdf_db_to_file(+DB, -File) is det.
 1291%!  rdf_db_to_file(-DB, +File) is det.
 1292%
 1293%   Translate between database encoding (often an   file or URL) and
 1294%   the name we store in the  directory.   We  keep  a cache for two
 1295%   reasons. Speed, but much more important   is that the mapping of
 1296%   raw --> encoded provided by  www_form_encode/2 is not guaranteed
 1297%   to be unique by the W3C standards.
 1298
 1299rdf_db_to_file(DB, File) :-
 1300    file_base_db(File, DB),
 1301    !.
 1302rdf_db_to_file(DB, File) :-
 1303    url_to_filename(DB, File),
 1304    assert(file_base_db(File, DB)).
 1305
 1306%!  url_to_filename(+URL, -FileName) is det.
 1307%!  url_to_filename(-URL, +FileName) is det.
 1308%
 1309%   Turn  a  valid  URL  into  a  filename.  Earlier  versions  used
 1310%   www_form_encode/2, but this can produce  characters that are not
 1311%   valid  in  filenames.  We  will  use    the   same  encoding  as
 1312%   www_form_encode/2,  but  using  our  own    rules   for  allowed
 1313%   characters. The only requirement is that   we avoid any filename
 1314%   special character in use.  The   current  encoding  use US-ASCII
 1315%   alnum characters, _ and %
 1316
 1317url_to_filename(URL, FileName) :-
 1318    atomic(URL),
 1319    !,
 1320    atom_codes(URL, Codes),
 1321    phrase(url_encode(EncCodes), Codes),
 1322    atom_codes(FileName, EncCodes).
 1323url_to_filename(URL, FileName) :-
 1324    uri_encoded(path, URL, FileName).
 1325
 1326url_encode([0'+|T]) -->
 1327    " ",
 1328    !,
 1329    url_encode(T).
 1330url_encode([C|T]) -->
 1331    alphanum(C),
 1332    !,
 1333    url_encode(T).
 1334url_encode([C|T]) -->
 1335    no_enc_extra(C),
 1336    !,
 1337    url_encode(T).
 1338url_encode(Enc) -->
 1339    (   "\r\n"
 1340    ;   "\n"
 1341    ),
 1342    !,
 1343    { string_codes("%0D%0A", Codes),
 1344      append(Codes, T, Enc)
 1345    },
 1346    url_encode(T).
 1347url_encode([]) -->
 1348    eos,
 1349    !.
 1350url_encode([0'%,D1,D2|T]) -->
 1351    [C],
 1352    { Dv1 is (C>>4 /\ 0xf),
 1353      Dv2 is (C /\ 0xf),
 1354      code_type(D1, xdigit(Dv1)),
 1355      code_type(D2, xdigit(Dv2))
 1356    },
 1357    url_encode(T).
 1358
 1359eos([], []).
 1360
 1361alphanum(C) -->
 1362    [C],
 1363    { C < 128,                      % US-ASCII
 1364      code_type(C, alnum)
 1365    }.
 1366
 1367no_enc_extra(0'_) --> "_".
 1368
 1369
 1370                 /*******************************
 1371                 *             REINDEX          *
 1372                 *******************************/
 1373
 1374%!  reindex_db(+Dir, +Levels)
 1375%
 1376%   Reindex the database by creating intermediate directories.
 1377
 1378reindex_db(Dir, Levels) :-
 1379    directory_files(Dir, Files),
 1380    reindex_files(Files, Dir, '.', 0, Levels),
 1381    remove_empty_directories(Files, Dir).
 1382
 1383reindex_files([], _, _, _, _).
 1384reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
 1385    nofollow(Nofollow),
 1386    !,
 1387    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1388reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
 1389    CLevel \== Levels,
 1390    file_name_extension(_Base, Ext, File),
 1391    db_extension(Ext),
 1392    !,
 1393    directory_file_path(Prefix, File, DBFile),
 1394    directory_file_path(Dir, DBFile, OldPath),
 1395    db_file(Dir, File, Levels, NewPath),
 1396    debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
 1397    file_directory_name(NewPath, NewDir),
 1398    make_directory_path(NewDir),
 1399    rename_file(OldPath, NewPath),
 1400    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1401reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
 1402    directory_file_path(Prefix, D, SubD),
 1403    directory_file_path(Dir, SubD, AbsD),
 1404    exists_directory(AbsD),
 1405    \+ read_link(AbsD, _, _),      % Do not follow links
 1406    !,
 1407    directory_files(AbsD, SubFiles),
 1408    CLevel2 is CLevel + 1,
 1409    reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
 1410    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1411reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
 1412    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1413
 1414
 1415remove_empty_directories([], _).
 1416remove_empty_directories([File|Files], Dir) :-
 1417    \+ nofollow(File),
 1418    directory_file_path(Dir, File, Path),
 1419    exists_directory(Path),
 1420    \+ read_link(Path, _, _),
 1421    !,
 1422    directory_files(Path, Content),
 1423    exclude(nofollow, Content, RealContent),
 1424    (   RealContent == []
 1425    ->  debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
 1426        delete_directory(Path)
 1427    ;   remove_empty_directories(RealContent, Path)
 1428    ),
 1429    remove_empty_directories(Files, Dir).
 1430remove_empty_directories([_|Files], Dir) :-
 1431    remove_empty_directories(Files, Dir).
 1432
 1433
 1434                 /*******************************
 1435                 *            PREFIXES          *
 1436                 *******************************/
 1437
 1438save_prefixes(Dir) :-
 1439    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1440    setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
 1441                       write_prefixes(Out),
 1442                       close(Out)).
 1443
 1444write_prefixes(Out) :-
 1445    format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
 1446    forall(rdf_current_ns(Alias, URI),
 1447           format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
 1448
 1449%!  load_prefixes(+RDFDBDir) is det.
 1450%
 1451%   If the file RDFDBDir/prefixes.db exists,  load the prefixes. The
 1452%   prefixes are registered using rdf_register_ns/3. Possible errors
 1453%   because the prefix  definitions  have   changed  are  printed as
 1454%   warnings, retaining the  old  definition.   Note  that  changing
 1455%   prefixes generally requires reloading all RDF from the source.
 1456
 1457load_prefixes(Dir) :-
 1458    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1459    (   exists_file(PrefixFile)
 1460    ->  setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
 1461                           read_prefixes(In),
 1462                           close(In))
 1463    ;   true
 1464    ).
 1465
 1466read_prefixes(Stream) :-
 1467    read_term(Stream, T0, []),
 1468    read_prefixes(T0, Stream).
 1469
 1470read_prefixes(end_of_file, _) :- !.
 1471read_prefixes(prefix(Alias, URI), Stream) :-
 1472    !,
 1473    must_be(atom, Alias),
 1474    must_be(atom, URI),
 1475    catch(rdf_register_ns(Alias, URI, []), E,
 1476          print_message(warning, E)),
 1477    read_term(Stream, T, []),
 1478    read_prefixes(T, Stream).
 1479read_prefixes(Term, _) :-
 1480    domain_error(prefix_term, Term).
 1481
 1482
 1483                 /*******************************
 1484                 *              UTIL            *
 1485                 *******************************/
 1486
 1487%!  mkdir(+Directory)
 1488%
 1489%   Create a directory if it does not already exist.
 1490
 1491mkdir(Directory) :-
 1492    exists_directory(Directory),
 1493    !.
 1494mkdir(Directory) :-
 1495    make_directory(Directory).
 1496
 1497%!  time_stamp(-Integer)
 1498%
 1499%   Return time-stamp rounded to integer.
 1500
 1501time_stamp(Int) :-
 1502    get_time(Now),
 1503    Int is round(Now).
 1504
 1505
 1506                 /*******************************
 1507                 *            MESSAGES          *
 1508                 *******************************/
 1509
 1510:- multifile
 1511    prolog:message/3,
 1512    prolog:message_context/3. 1513
 1514prolog:message(rdf(Term)) -->
 1515    message(Term).
 1516
 1517message(restoring(Type, Count, Jobs)) -->
 1518    [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
 1519message(restore(attached(Graphs, Triples, Time/Wall))) -->
 1520    { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
 1521    [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
 1522      [Graphs, Triples, Wall, Percent, Time] ].
 1523% attach_graph/2
 1524message(restore(true, Action)) -->
 1525    !,
 1526    silent_message(Action).
 1527message(restore(brief, Action)) -->
 1528    !,
 1529    brief_message(Action).
 1530message(restore(_, Graph)) -->
 1531    [ 'Restoring ~p ... '-[Graph], flush ].
 1532message(restore(_, snapshot(_))) -->
 1533    [ at_same_line, '(snapshot) '-[], flush ].
 1534message(restore(_, journal(_))) -->
 1535    [ at_same_line, '(journal) '-[], flush ].
 1536message(restore(_, done(_, Time, Count))) -->
 1537    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1538% load_source/4
 1539message(restore(_, snapshot(G, _))) -->
 1540    [ 'Restoring ~p\t(snapshot)'-[G], flush ].
 1541message(restore(_, journal(G, _))) -->
 1542    [ 'Restoring ~p\t(journal)'-[G], flush ].
 1543message(restore(_, done(_, Time, Count))) -->
 1544    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1545% journal handling
 1546message(update_failed(S,P,O,Action)) -->
 1547    [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
 1548% directory reindexing
 1549message(reindex(Count, Depth)) -->
 1550    [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
 1551message(reindex(Depth)) -->
 1552    [ 'Fixing database directory structure (~d levels)'-[Depth] ].
 1553message(read_only) -->
 1554    [ 'Cannot write persistent store; continuing in read-only mode.', nl,
 1555      'All changes to the RDF store will be lost if this process terminates.'
 1556    ].
 1557
 1558silent_message(_Action) --> [].
 1559
 1560brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
 1561    { file_base_name(Graph, Base) },
 1562    [ at_same_line,
 1563      '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
 1564      flush
 1565    ].
 1566brief_message(_) --> [].
 1567
 1568
 1569prolog:message_context(rdf_locked(Args)) -->
 1570    { memberchk(time(Time), Args),
 1571      memberchk(pid(Pid), Args),
 1572      format_time(string(S), '%+', Time)
 1573    },
 1574    [ nl,
 1575      'locked at ~s by process id ~w'-[S,Pid]
 1576    ]