View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald, Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2019, Jeffrey Rosenwald
    7                   CWI, Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(paxos,
   37          [ paxos_get/1,                        % ?Term
   38            paxos_get/2,                        % +Key, -Value
   39            paxos_get/3,                        % +Key, -Value, +Options
   40            paxos_set/1,                        % ?Term
   41            paxos_set/2,                        % +Key, +Value
   42            paxos_set/3,                        % +Key, +Value, +Options
   43            paxos_on_change/2,                  % ?Term, +Goal
   44            paxos_on_change/3,                  % ?Key, ?Value, +Goal
   45
   46            paxos_initialize/1,			% +Options
   47
   48            paxos_admin_key/2,                  % ?Name, ?Key
   49            paxos_property/1,                   % ?Property
   50            paxos_quorum_ask/4,                 % ?Templ, +Msg, -Result, +Options
   51                                                % Hook support
   52            paxos_replicate_key/3               % +Nodes, ?Key, +Options
   53          ]).   54:- autoload(library(apply),[partition/4,maplist/3]).   55:- autoload(library(broadcast),
   56	    [ listen/3,
   57	      broadcast_request/1,
   58	      broadcast/1,
   59	      unlisten/1,
   60	      listen/2,
   61	      unlisten/2
   62	    ]).   63:- use_module(library(debug),[debug/3]).   64:- autoload(library(error),
   65	    [permission_error/3,resource_error/1,must_be/2]).   66:- autoload(library(lists),[select/3,nth1/3,max_list/2,member/2]).   67:- autoload(library(option),[option/2,option/3]).   68:- autoload(library(solution_sequences),[call_nth/2]).   69:- use_module(library(settings),[setting/4,setting/2]).   70
   71/** <module> A Replicated Data Store
   72
   73This module provides a replicated data store that is coordinated using a
   74variation on Lamport's Paxos concensus protocol.  The original method is
   75described in his paper entitled, "The   Part-time Parliament", which was
   76published in 1998. The algorithm is   tolerant of non-Byzantine failure.
   77That is late or lost delivery or   reply,  but not senseless delivery or
   78reply. The present algorithm takes advantage  of the convenience offered
   79by multicast to the quorum's membership,   who  can remain anonymous and
   80who can come and go as they  please without effecting Liveness or Safety
   81properties.
   82
   83Paxos' quorum is a set of one or more attentive members, whose processes
   84respond to queries within some known time limit (< 20ms), which includes
   85roundtrip delivery delay. This property is   easy  to satisfy given that
   86every coordinator is necessarily a member of   the quorum as well, and a
   87quorum of one is  permitted.  An   inattentive  member  (e.g.  one whose
   88actions are late or lost) is deemed to be "not-present" for the purposes
   89of the present transaction and consistency   cannot  be assured for that
   90member. As long as there is at least one attentive member of the quorum,
   91then persistence of the database is assured.
   92
   93Each member maintains a ledger  of   terms  along with information about
   94when  they  were   originally   recorded.    The   member's   ledger  is
   95deterministic. That is to say  that  there   can  only  be one entry per
   96functor/arity combination. No member will  accept   a  new term proposal
   97that has a line number that is equal-to   or  lower-than the one that is
   98already recorded in the ledger.
   99
  100Paxos is a three-phase protocol:
  101
  102   1: A coordinator first prepares the quorum for a new proposal by
  103   broadcasting a proposed term. The quorum responds by returning the
  104   last known line number for that functor/arity combination that is
  105   recorded in their respective ledgers.
  106
  107   2: The coordinator selects the highest line number it receives,
  108   increments it by one, and then asks the quorum to finally accept the
  109   new term with the new line number. The quorum checks their respective
  110   ledgers once again and if there is still no other ledger entry for
  111   that functor/arity combination that is equal-to or higher than the
  112   specified line, then each member records the term in the ledger at
  113   the specified line. The member indicates consent by returning the
  114   specified line number back to the coordinator. If consent is withheld
  115   by a member, then the member returns a =nack= instead. The
  116   coordinator requires unanimous consent. If it isn't achieved then the
  117   proposal fails and the coordinator must start over from the
  118   beginning.
  119
  120   3: Finally, the coordinator concludes the successful negotiation by
  121   broadcasting the agreement to the quorum in the form of a
  122   paxos_changed(Key,Value) event. This is the only event that
  123   should be of interest to user programs.
  124
  125For practical reasons, we rely  on   the  partially synchronous behavior
  126(e.g. limited upper time bound for  replies) of broadcast_request/1 over
  127TIPC to ensure Progress. Perhaps more importantly,   we rely on the fact
  128that the TIPC broadcast listener state  machine guarantees the atomicity
  129of broadcast_request/1 at the process level, thus obviating the need for
  130external mutual exclusion mechanisms.
  131
  132_|Note that this algorithm does not guarantee the rightness of the value
  133proposed. It only guarantees that if   successful, the value proposed is
  134identical for all attentive members of the quorum.|_
  135
  136@author    Jeffrey Rosenwald (JeffRose@acm.org)
  137@license   BSD-2
  138@see       tipc_broadcast.pl, udp_broadcast.pl
  139*/
  140
  141:- meta_predicate
  142    paxos_on_change(?, 0),
  143    paxos_on_change(?, ?, 0).  144
  145:- multifile
  146    paxos_message_hook/3,               % +PaxOS, +TimeOut, -Message
  147    paxos_ledger_hook/5.                % +Op, ?Key, ?Gen, ?Value, ?Status
  148
  149:- setting(max_sets, nonneg, 20,
  150           "Max Retries to get to an agreement").  151:- setting(max_gets, nonneg, 5,
  152           "Max Retries to get a value from the forum").  153:- setting(response_timeout, float, 0.020,
  154           "Max time to wait for a response").  155:- setting(replication_rate, number, 1000,
  156           "Number of keys replicated per second").  157:- setting(death_half_life, number, 10,
  158           "Half-time for failure score").  159:- setting(death_score, number, 100,
  160           "Consider a node dead if cummulative failure \c
  161            score exceeds this number").  162
  163
  164%!  paxos_initialize(+Options) is det.
  165%
  166%   Initialize this Prolog process as a   paxos node. The initialization
  167%   requires an initialized and configured TIPC,  UDP or other broadcast
  168%   protocol. Calling this initialization may be  omitted, in which case
  169%   the equivant of paxos_initialize([]) is executed   lazily as part of
  170%   the first paxos operation.  Defined options:
  171%
  172%     - node(?NodeID)
  173%     When instantiated, this node rejoins the network with the given
  174%     node id. A fixed node idea should be used if the node is
  175%     configured for persistency and causes the new node to receive
  176%     updates for keys that have been created or modified since the
  177%     node left the network.  If NodeID is a variable it is unified
  178%     with the discovered NodeID.
  179%
  180%     NodeID must be a small non-negative integer as these identifiers
  181%     are used in bitmaps.
  182
  183:- dynamic  paxos_initialized/0.  184:- volatile paxos_initialized/0.  185
  186paxos_initialize(_Options) :-
  187    paxos_initialized,
  188    !.
  189paxos_initialize(Options) :-
  190    with_mutex(paxos, paxos_initialize_sync(Options)).
  191
  192paxos_initialize_sync(_Options) :-
  193    paxos_initialized,
  194    !.
  195paxos_initialize_sync(Options) :-
  196    at_halt(paxos_leave),
  197    listen(paxos, paxos(X), paxos_message(X)),
  198    paxos_assign_node(Options),
  199    start_replicator,
  200    asserta(paxos_initialized).
  201
  202paxos_initialize :-
  203    paxos_initialize([]).
  204
  205
  206		 /*******************************
  207		 *            ADMIN		*
  208		 *******************************/
  209
  210%!  paxos_get_admin(+Name, -Value) is semidet.
  211%!  paxos_set_admin(+Name, +Value) is semidet.
  212%
  213%   Set administrative keys. We use a wrapper  such that we can hide the
  214%   key identity.
  215
  216paxos_admin_key(quorum, '$paxos_quorum').
  217paxos_admin_key(dead,   '$paxos_dead_nodes').
  218
  219paxos_get_admin(Name, Value) :-
  220    paxos_admin_key(Name, Key),
  221    paxos_get(Key, Value).
  222
  223paxos_set_admin(Name, Value) :-
  224    paxos_admin_key(Name, Key),
  225    paxos_set(Key, Value).
  226
  227paxos_set_admin_bg(Name, Value) :-
  228    thread_create(ignore(paxos_set_admin(Name, Value)), _,
  229                  [ detached(true)
  230                  ]).
  231
  232
  233		 /*******************************
  234		 *           NODE DATA		*
  235		 *******************************/
  236
  237%!  node(?NodeId).
  238%!  quorum(?Bitmap).
  239%!  dead(?Bitmap).
  240%!  failed(?Bitmap).
  241%!  failed(?NodeId, ?LastTried, ?Score).
  242%
  243%   Track our identity as well as as  the   status  of  our peers in the
  244%   network. NodeId is a small integer. Multiple NodeIds are combined in
  245%   a Bitmap.
  246%
  247%     - node/1 is our identity.
  248%     - quorum/1 is the set of members of the quorum
  249%     - failed/1 is the set of members for which the last message was
  250%       not confirmed.
  251%     - failed/3 tracks individual failed nodes. If accumulates failures
  252%       until the node is marked _dead_.
  253%     - dead/1 is the set of members that is considered dead.
  254
  255:- dynamic
  256    node/1,                             % NodeID
  257    quorum/1,                           % Bitmap
  258    failed/1,                           % Bitmap
  259    failed/3,                           % NodeID, LastTried, Score
  260    leaving/0,                          % Node is leaving
  261    dead/1,                             % Bitmap
  262    salt/1.                             % Unique key
  263:- volatile
  264    node/1,
  265    quorum/1,
  266    failed/1,
  267    failed/3,
  268    leaving/0,
  269    dead/1,
  270    salt/1.  271
  272%!  paxos_assign_node(+Options) is det.
  273%
  274%   Assign a node for this  paxos  instance.   If  node  is  given as an
  275%   option, this is the node id that   is used. Otherwise the network is
  276%   analysed and the system selects a new node.
  277
  278paxos_assign_node(Options) :-
  279    (   option(node(Node), Options)
  280    ->  node(Node)
  281    ;   node(_)
  282    ),                                          % already done
  283    !.
  284paxos_assign_node(Options) :-
  285    between(1, 20, Retry),
  286    option(node(Node), Options, Node),
  287    (   node(_)
  288    ->  permission_error(set, paxos_node, Node)
  289    ;   true
  290    ),
  291    retractall(dead(_)),
  292    retractall(quorum(_)),
  293    retractall(failed(_)),
  294    retractall(failed(_,_,_)),
  295    retractall(leaving),
  296    Salt is random(1<<63),
  297    asserta(salt(Salt)),
  298    paxos_message(node(N,Q,D):From, 0.25, NodeQuery),
  299    findall(t(N,Q,D,From),
  300            broadcast_request(NodeQuery),
  301            Network),
  302    select(t(self,0,Salt,Me), Network, AllNodeStatus),
  303    partition(starting, AllNodeStatus, Starting, Running),
  304    nth_starting(Starting, Salt, Offset),
  305    retractall(salt(_)),
  306    debug(paxos(node), 'Me@~p; starting: ~p; running: ~p',
  307          [Me, Starting, Running]),
  308    arg_union(2, Running, Quorum),
  309    arg_union(3, Running, Dead),
  310    (   var(Node)
  311    ->  (   call_nth(( between(0, 1000, Node),
  312                       \+ memberchk(t(Node,_,_,_), Running),
  313                       Dead /\ (1<<Node) =:= 0),
  314                     Offset)
  315        ->  debug(paxos(node), 'Assigning myself node ~d', [Node])
  316        ;   resource_error(paxos_nodes)
  317        )
  318    ;   memberchk(t(Node,_,_,_), Running)
  319    ->  permission_error(set, paxos_node, Node)
  320    ;   Rejoin = true
  321    ),
  322    asserta(node(Node)),
  323    (   claim_node(Node, Me)
  324    ->  !,
  325        asserta(dead(Dead)),
  326        set_quorum(Node, Quorum),
  327        (   Rejoin == true
  328        ->  paxos_rejoin
  329        ;   true
  330        )
  331    ;   debug(paxos(node), 'Node ~p already claimed; retrying (~p)',
  332              [Node, Retry]),
  333        retractall(node(Node)),
  334        fail
  335    ).
  336
  337starting(t(self,_Quorum,_Salt,_Address)).
  338
  339nth_starting(Starting, Salt, N) :-
  340    maplist(arg(3), Starting, Salts),
  341    sort([Salt|Salts], Sorted),
  342    nth1(N, Sorted, Salt),
  343    !.
  344
  345claim_node(Node, Me) :-
  346    paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery),
  347    forall((   broadcast_request(NodeQuery),
  348               From \== Me,
  349               debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok])
  350           ),
  351           Ok == true).
  352
  353set_quorum(Node, Quorum0) :-
  354    Quorum is Quorum0 \/ (1<<Node),
  355    debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]),
  356    asserta(quorum(Quorum)),
  357    paxos_set_admin(quorum, Quorum).
  358
  359
  360%!  paxos_rejoin
  361%
  362%   Re-join the network.  Tasks:
  363%
  364%     - Remove myself from the dead list if I'm on there
  365%     - Tell the replicators we lost everything.
  366
  367paxos_rejoin :-
  368    node(Node),
  369    repeat,
  370        (   paxos_get_admin(dead, Dead0)
  371        ->  Dead is Dead0 /\ \(1<<Node),
  372            (   Dead == Dead0
  373            ->  true
  374            ;   paxos_set_admin(dead, Dead)
  375            )
  376        ;   true
  377        ),
  378    !.
  379
  380%!  paxos_leave is det.
  381%!  paxos_leave(+Node) is det.
  382%
  383%   Leave the network.  The  predicate   paxos_leave/0  is  called  from
  384%   at_halt/1 to ensure the node is  deleted   as  the process dies. The
  385%   paxos_leave/1 version is called  to  discard   other  nodes  if they
  386%   repeatedly did not respond to queries.
  387
  388paxos_leave :-
  389    node(Node),
  390    !,
  391    asserta(leaving),
  392    paxos_leave(Node),
  393    Set is 1<<Node,
  394    paxos_message(forget(Set), -, Forget),
  395    broadcast(Forget),
  396    unlisten(paxos),
  397    retractall(leaving).
  398paxos_leave.
  399
  400paxos_leave(Node) :-
  401    !,
  402    paxos_update_set(quorum, del(Node)),
  403    paxos_update_set(dead,   add(Node)).
  404paxos_leave(_).
  405
  406paxos_update_set(Set, How) :-
  407    repeat,
  408      Term =.. [Set,Value],
  409      call(Term),
  410      (   How = add(Node)
  411      ->  NewValue is Value \/  (1<<Node)
  412      ;   How = del(Node)
  413      ->  NewValue is Value /\ \(1<<Node)
  414      ),
  415      (   Value == NewValue
  416      ->  true
  417      ;   paxos_set_admin(Set, NewValue)
  418      ->  true
  419      ;   leaving
  420      ),
  421    !.
  422
  423		 /*******************************
  424		 *          NODE STATUS		*
  425		 *******************************/
  426
  427%!  update_failed(+Action, +Quorum, +Alive) is det.
  428%
  429%   We just sent the Quorum a  message  and   got  a  reply from the set
  430%   Alive.
  431%
  432%   @arg is one of `set`, `get` or `replicate` and indicates the
  433%   intended action.
  434
  435update_failed(Action, Quorum, Alive) :-
  436    Failed is Quorum /\ \Alive,
  437    alive(Alive),
  438    consider_dead(Failed),
  439    (   failed(Failed)
  440    ->  true
  441    ;   (   clause(failed(_Old), true, Ref)
  442        ->  asserta(failed(Failed)),
  443            erase(Ref),
  444            debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed])
  445        ;   asserta(failed(Failed))
  446        ),
  447        (   Action == set
  448        ->  start_replicator
  449        ;   true
  450        )
  451    ).
  452
  453consider_dead(0) :-
  454    !.
  455consider_dead(Failed) :-
  456    Node is lsb(Failed),
  457    consider_dead1(Node),
  458    Rest is Failed /\ \(1<<Node),
  459    consider_dead(Rest).
  460
  461consider_dead1(Node) :-
  462    clause(failed(Node, Last, Score), true, Ref),
  463    !,
  464    setting(death_half_life, HalfLife),
  465    setting(death_score, DeathScore),
  466    get_time(Now),
  467    Passed is Now-Last,
  468    NewScore is Score*(2**(-Passed/HalfLife)) + 10,
  469    asserta(failed(Node, Now, NewScore)),
  470    erase(Ref),
  471    (   NewScore < DeathScore
  472    ->  debug(paxos(node), 'Consider node ~d dead', [Node]),
  473        paxos_leave(Node)
  474    ;   true
  475    ).
  476consider_dead1(Node) :-
  477    get_time(Now),
  478    asserta(failed(Node, Now, 10)).
  479
  480alive(Bitmap) :-
  481    (   clause(failed(Node, _Last, _Score), true, Ref),
  482        Bitmap /\ (1<<Node) =\= 0,
  483        erase(Ref),
  484        fail
  485    ;   true
  486    ).
  487
  488
  489%!  life_quorum(-Quorum, -LifeQuorum) is det.
  490%
  491%   Find the Quorum and the living nodes   from  the Quorum. This is the
  492%   set for which we wait.  If  the   LifeQuorum  is  not  a majority we
  493%   address the whole Quorum.
  494%
  495%   @tbd At some point in time we must remove a node from the quorum.
  496
  497life_quorum(Quorum, LifeQuorum) :-
  498    quorum(Quorum),
  499    (   failed(Failed),
  500        Failed \== 0,
  501        LifeQuorum is Quorum /\ \Failed,
  502        majority(LifeQuorum, Quorum)
  503    ->  true
  504    ;   LifeQuorum = Quorum
  505    ).
  506
  507
  508		 /*******************************
  509		 *        NETWORK STATUS	*
  510		 *******************************/
  511
  512:- paxos_admin_key(quorum, Key),
  513   listen(paxos_changed(Key, Quorum),
  514          update_quorum(Quorum)).  515:- paxos_admin_key(dead, Key),
  516   listen(paxos_changed(Key, Death),
  517          update_dead(Death)).  518
  519update_quorum(Proposed) :-
  520    debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]),
  521    quorum(Proposed),
  522    !.
  523update_quorum(Proposed) :-
  524    leaving,
  525    !,
  526    update(quorum(Proposed)).
  527update_quorum(Proposed) :-
  528    node(Node),
  529    Proposed /\ (1<<Node) =\= 0,
  530    !,
  531    update(quorum(Proposed)).
  532update_quorum(Proposed) :-
  533    node(Node),
  534    NewQuorum is Proposed \/ (1<<Node),
  535    update(quorum(NewQuorum)),
  536    debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]),
  537    paxos_set_admin_bg(quorum, NewQuorum).
  538
  539update_dead(Proposed) :-
  540    debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]),
  541    dead(Proposed),
  542    !.
  543update_dead(Proposed) :-
  544    leaving,
  545    !,
  546    update(dead(Proposed)).
  547update_dead(Proposed) :-
  548    node(Node),
  549    Proposed /\ (1<<Node) =:= 0,
  550    !,
  551    update(dead(Proposed)).
  552update_dead(Proposed) :-
  553    node(Node),
  554    NewDead is Proposed /\ \(1<<Node),
  555    update(dead(NewDead)),
  556    paxos_set_admin_bg(dead, NewDead).
  557
  558update(Clause) :-
  559    functor(Clause, Name, Arity),
  560    functor(Generic, Name, Arity),
  561    (   clause(Generic, true, Ref)
  562    ->  asserta(Clause),
  563        erase(Ref)
  564    ;   asserta(Clause)
  565    ).
  566
  567%!  paxos_property(?Property)
  568%
  569%   True if Property is  a  current   property  for  the  paxos network.
  570%   Defined properties are:
  571%
  572%     - node(?NodeID)
  573%     - quorum(?NodeBitmap)
  574%     - failed(?NodeBitmap)
  575
  576paxos_property(node(NodeID)) :-
  577    node(NodeID).
  578paxos_property(quorum(Quorum)) :-
  579    quorum(Quorum).
  580paxos_property(failed(Nodes)) :-
  581    failed(Nodes).
  582
  583
  584		 /*******************************
  585		 *         INBOUND EVENTS	*
  586		 *******************************/
  587
  588%!  paxos_message(?Message)
  589%
  590%   Handle inbound actions from our peers.   Defines  values for Message
  591%   are:
  592%
  593%     - prepare(+Key,-Node,-Gen,+Value)
  594%     A request message to set Key to Value. Returns the current
  595%     generation at which we have a value or `0` for Gen and the
  596%     our node id for Node.
  597%     - accept(+Key,-Node,+Gen,-GenA,+Value)
  598%     A request message to set Key to Value if Gen is newer than
  599%     the generation we have for Key.  In that case GenA is Gen.
  600%     Otherwise we reject using GenA = `nack`.
  601%     - changed(+Key,+Gen,+Value,+Acceptors)
  602%     The leader got enough accepts for setting Key to Value at Gen.
  603%     Acceptors is the set of nodes that accepted this value.
  604%     - learn(+Key,-Node,+Gen,-GenA,+Value)
  605%     Request message peforming phase one for replication to learner
  606%     nodes.
  607%     - learned(+Key,+Gen,+Value,+Acceptors)
  608%     Phase two of the replication. Confirm the newly learned knowledge.
  609%     - retrieve(+Key,-Node,-Gen,-Value)
  610%     A request message to retrieve our value for Key.  Also provides
  611%     our node id and the generation.
  612%     - forget(+Nodes)
  613%     Forget the existence of Nodes.
  614%     - node(-Node,-Quorum,-Dead)
  615%     Get my view about the network.  Node is the (integer) node id of
  616%     this node, Quorum is the idea of the quorum and Dead is the idea
  617%     about non-responsive nodes.
  618%
  619%   @tbd: originally the changed was  handled  by   a  get  and when not
  620%   successful with a new set, named   _paxos_audit_. I don't really see
  621%   why we need this.
  622
  623paxos_message(prepare(Key,Node,Gen,Value)) :-
  624    node(Node),
  625    (   ledger(Key, Gen, _)
  626    ->  true
  627    ;   Gen = 0,
  628        ledger_create(Key, Gen, Value)
  629    ),
  630    debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]).
  631paxos_message(accept(Key,Node,Gen,GenA,Value)) :-
  632    node(Node),
  633    (   ledger_update(Key, Gen, Value)
  634    ->  debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]),
  635        GenA = Gen
  636    ;   debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]),
  637        GenA = nack
  638    ).
  639paxos_message(changed(Key,Gen,Value,Acceptors)) :-
  640    debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]),
  641    ledger_update_holders(Key,Gen,Acceptors),
  642    broadcast(paxos_changed(Key,Value)).
  643paxos_message(learn(Key,Node,Gen,GenA,Value)) :-
  644    node(Node),
  645    debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]),
  646    (   ledger_learn(Key,Gen,Value)
  647    ->  debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]),
  648        GenA = Gen
  649    ;   debug(paxos, 'Rejected ~p@~d', [Key, Gen]),
  650        GenA = nack
  651    ).
  652paxos_message(learned(Key,Gen,_Value,Acceptors)) :-
  653    ledger_update_holders(Key,Gen,Acceptors).
  654paxos_message(retrieve(Key,Node,K,Value)) :-
  655    node(Node),
  656    debug(paxos, 'Retrieving ~p', [Key]),
  657    ledger(Key,K,Value),
  658    debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]),
  659    !.
  660paxos_message(forget(Nodes)) :-
  661    ledger_forget(Nodes).
  662paxos_message(node(Node,Quorum,Dead)) :-
  663    (   node(Node),
  664        quorum(Quorum),
  665        dead(Dead)
  666    ->  true
  667    ;   salt(Salt),
  668        Node = self,
  669        Quorum = 0,
  670        Dead = Salt
  671    ).
  672paxos_message(claim_node(Node, Ok)) :-
  673    (   node(Node)
  674    ->  Ok = false
  675    ;   Ok = true
  676    ).
  677paxos_message(ask(Node, Message)) :-
  678    node(Node),
  679    broadcast_request(Message).
  680
  681
  682		 /*******************************
  683		 *     KEY-VALUE OPERATIONS	*
  684		 *******************************/
  685
  686%%  paxos_set(+Term) is semidet.
  687%
  688%   Equivalent to paxos_key(Term,Key), pasox_set(Key,Term).   I.e., Term
  689%   is a ground compound term and its   key is the name/arity pair. This
  690%   version provides compatibility with older versions of this library.
  691
  692%%  paxos_set(+Key, +Value) is semidet.
  693%%  paxos_set(+Key, +Value, +Options) is semidet.
  694%
  695%   negotiates to have Key-Value recorded in the  ledger for each of the
  696%   quorum's members. This predicate succeeds  if the quorum unanimously
  697%   accepts the proposed term. If no such   entry  exists in the Paxon's
  698%   ledger, then one is silently  created.   paxos_set/1  will retry the
  699%   transaction several times (default: 20)   before failing. Failure is
  700%   rare and is usually the result of a collision of two or more writers
  701%   writing to the same term at precisely  the same time. On failure, it
  702%   may be useful to wait some random period of time, and then retry the
  703%   transaction. By specifying a retry count   of zero, paxos_set/2 will
  704%   succeed iff the first ballot succeeds.
  705%
  706%   On   success,   paxos_set/1   will   also     broadcast   the   term
  707%   paxos_changed(Key,Value), to the quorum.
  708%
  709%   Options processed:
  710%
  711%     - retry(Retries)
  712%     is a non-negative integer specifying the number of retries that
  713%     will be performed before a set is abandoned.  Defaults to the
  714%     _setting_ `max_sets` (20).
  715%     - timeout(+Seconds)
  716%     Max time to wait for the forum to reply.  Defaults to the
  717%     _setting_ `response_timeout` (0.020, 20ms).
  718%
  719%   @arg Term is a compound  that   may  have  unbound variables.
  720%   @tbd If the Value is already current, should we simply do nothing?
  721
  722paxos_set(Term) :-
  723    paxos_key(Term, Key),
  724    paxos_set(Key, Term, []).
  725
  726paxos_set(Key, Value) :-
  727    paxos_set(Key, Value, []).
  728
  729paxos_set(Key, Value, Options) :-
  730    must_be(ground, Key-Value),
  731    paxos_initialize,
  732    option(retry(Retries), Options, Retries),
  733    option(timeout(TMO), Options, TMO),
  734    apply_default(Retries, max_sets),
  735    apply_default(TMO, response_timeout),
  736    paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare),
  737    between(0, Retries, _),
  738      life_quorum(Quorum, Alive),
  739      Alive \== 0,
  740      debug(paxos, 'Set: ~p -> ~p', [Key, Value]),
  741      collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes),
  742      debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p',
  743            [Quorum, PrepNodes, Rps]),
  744      majority(PrepNodes, Quorum),
  745      max_list(Rps, K),
  746      succ(K, K1),
  747      paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept),
  748      collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes),
  749      majority(AcceptNodes, Quorum),
  750      intersecting(PrepNodes, AcceptNodes),
  751      c_element(Ras, K, K1),
  752      broadcast(paxos(log(Key,Value,AcceptNodes,K1))),
  753      paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed),
  754      broadcast(Changed),
  755      update_failed(set, Quorum, AcceptNodes),
  756    !.
  757
  758apply_default(Var, Setting) :-
  759    var(Var),
  760    !,
  761    setting(Setting, Var).
  762apply_default(_, _).
  763
  764majority(SubSet, Set) :-
  765    popcount(SubSet) >= (popcount(Set)+2)//2.
  766
  767intersecting(Set1, Set2) :-
  768    Set1 /\ Set2 =\= 0.
  769
  770
  771%!  collect(+Quorum, :Stop, ?Node, ?Template, ?Message,
  772%!          -Result, -NodeSet) is semidet.
  773%
  774%   Perform a broadcast request using Message.   Node and Template share
  775%   with Message and extract the replying node and the result value from
  776%   Message. Result is the list of  instantiations for Template received
  777%   and NodeSet is the set (bitmask) of   Node values that replies, i.e.
  778%   |NodeSet| is length(Result). The transfer stops   if  all members of
  779%   the set Quorum responded or the configured timeout passed.
  780
  781collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
  782    State = state(0),
  783    L0 = [dummy|_],
  784    Answers = list(L0),
  785    (   broadcast_request(Message),
  786        debug(paxos(request), 'broadcast_request: ~p', [Message]),
  787        (   Stop
  788        ->  !,
  789            fail
  790        ;   true
  791        ),
  792        duplicate_term(Template, Copy),
  793        NewLastCell = [Copy|_],
  794        arg(1, Answers, LastCell),
  795        nb_linkarg(2, LastCell, NewLastCell),
  796        nb_linkarg(1, Answers, NewLastCell),
  797        arg(1, State, Replied0),
  798        Replied is Replied0 \/ (1<<Node),
  799        nb_setarg(1, State, Replied),
  800        Quorum /\ Replied =:= Quorum
  801    ->  true
  802    ;   true
  803    ),
  804    arg(1, State, NodeSet),
  805    arg(1, Answers, [_]),               % close the answer list
  806    L0 = [_|Result].
  807
  808%!  paxos_quorum_ask(?Template, +Message, -Result, +Options)
  809%
  810%   Ask the paxos forum for their opinion.  This predicate is not really
  811%   part  of  the  paxos  protocol,  but    reuses  notably  the  quorum
  812%   maintenance mechanism of this library for   asking  questions to the
  813%   quorum (cluster). Message is the message to   be  asked. Result is a
  814%   list of copies of Template from the quorum. Options:
  815%
  816%     - timeout(+Seconds)
  817%       Max time to wait for a reply. Default is the setting
  818%       `response_timeout`.
  819%     - node(?Node)
  820%       Can be used to include the replying node into Template.
  821%     - quorum(?Quorum)
  822%       Set/query the interviewed quorum as a bitmask
  823
  824paxos_quorum_ask(Template, Message, Result, Options) :-
  825    option(timeout(TMO), Options, TMO),
  826    option(node(Node), Options, _),
  827    option(quorum(Quorum), Options, Quorum),
  828    apply_default(TMO, response_timeout),
  829    (   var(Quorum)
  830    ->  life_quorum(Quorum, _Alive)
  831    ;   true
  832    ),
  833    paxos_message(ask(Node, Message), TMO, BroadcastMessage),
  834    collect(Quorum, false, Node, Template, BroadcastMessage, Result, _PrepNodes).
  835
  836%!  paxos_get(?Term) is semidet.
  837%
  838%   Equivalent to paxos_key(Term,Key), pasox_get(Key,Term).   I.e., Term
  839%   is a compound term and its key  is the name/arity pair. This version
  840%   provides compatibility with older versions of this library.
  841
  842%!  paxos_get(+Key, +Value) is semidet.
  843%!  paxos_get(+Key, +Value, +Options) is semidet.
  844%
  845%   unifies Term with the entry retrieved from the Paxon's ledger. If no
  846%   such entry exists in the member's local   cache,  then the quorum is
  847%   asked to provide a value,  which   is  verified  for consistency. An
  848%   implied paxos_set/1 follows. This predicate  succeeds if a term
  849%   with the same functor and arity exists   in  the Paxon's ledger, and
  850%   fails otherwise.
  851%
  852%   Options processed:
  853%
  854%     - retry(Retries)
  855%     is a non-negative integer specifying the number of retries that
  856%     will be performed before a set is abandoned.  Defaults to the
  857%     _setting_ `max_gets` (5).
  858%     - timeout(+Seconds)
  859%     Max time to wait for the forum to reply.  Defaults to the
  860%     _setting_ `response_timeout` (0.020, 20ms).
  861%
  862%   @arg Term is a compound. Any unbound variables are unified with
  863%   those provided in the ledger entry.
  864
  865paxos_get(Term) :-
  866    paxos_key(Term, Key),
  867    paxos_get(Key, Term, []).
  868paxos_get(Key, Value) :-
  869    paxos_get(Key, Value, []).
  870
  871paxos_get(Key, Value, _) :-
  872    ledger(Key, _Line, Value),
  873    !.
  874paxos_get(Key, Value, Options) :-
  875    paxos_initialize,
  876    option(retry(Retries), Options, Retries),
  877    option(timeout(TMO), Options, TMO),
  878    apply_default(Retries, max_gets),
  879    apply_default(TMO, response_timeout),
  880    Msg = Line-Value,
  881    paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve),
  882    node(Node),
  883    between(0, Retries, _),
  884      life_quorum(Quorum, Alive),
  885      QuorumA is Alive /\ \(1<<Node),
  886      collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes),
  887      debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]),
  888      highest_vote(Terms, _Line-MajorityValue, Count),
  889      debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]),
  890      Count >= (popcount(QuorumA)+2)//2,
  891      debug(paxos, 'Retrieve: accept ~p', [MajorityValue]),
  892      update_failed(get, Quorum, RetrievedNodes),
  893      paxos_set(Key, MajorityValue),    % Is this needed?
  894    !.
  895
  896highest_vote(Terms, Term, Count) :-
  897    msort(Terms, Sorted),
  898    count_votes(Sorted, Counted),
  899    sort(1, >, Counted, [Count-Term|_]).
  900
  901count_votes([], []).
  902count_votes([H|T0], [N-H|T]) :-
  903    count_same(H, T0, 1, N, R),
  904    count_votes(R, T).
  905
  906count_same(H, [Hc|T0], C0, C, R) :-
  907    H == Hc,
  908    !,
  909    C1 is C0+1,
  910    count_same(H, T0, C1, C, R).
  911count_same(_, R, C, C, R).
  912
  913%!  paxos_key(+Term, -Key) is det.
  914%
  915%   Compatibility to allow for paxos_get/1, paxos_set/1, etc. The key of
  916%   a compound term is a term `'$c'(Name,Arity)`.   Note  that we do not
  917%   use `Name/Arity` and `X/Y` is  naturally   used  to organize keys as
  918%   hierachical _paths_.
  919
  920paxos_key(Compound, '$c'(Name,Arity)) :-
  921    compound(Compound), !,
  922    compound_name_arity(Compound, Name, Arity).
  923paxos_key(Compound, _) :-
  924    must_be(compound, Compound).
  925
  926
  927		 /*******************************
  928		 *          REPLICATION		*
  929		 *******************************/
  930
  931%!  start_replicator
  932%
  933%   Start or signal the replicator thread  that there may be outstanding
  934%   replication work.  This is the case if
  935%
  936%     - The union of _quorum_ and _learners_ was extended, and thus
  937%       all data may need to be replicated to the new members.
  938%     - A paxos_set/3 was not fully acknowledged.
  939
  940start_replicator :-
  941    catch(thread_send_message(paxos_replicator, run),
  942          error(existence_error(_,_),_),
  943          fail),
  944    !.
  945start_replicator :-
  946    catch(thread_create(replicator, _,
  947                        [ alias(paxos_replicator),
  948                          detached(true)
  949                        ]),
  950          error(permission_error(_,_,_),_),
  951          true).
  952
  953replicator :-
  954    setting(replication_rate, ReplRate),
  955    ReplSleep is 1/ReplRate,
  956    node(Node),
  957    debug(paxos(replicate), 'Starting replicator', []),
  958    State = state(idle),
  959    repeat,
  960      quorum(Quorum),
  961      dead(Dead),
  962      LifeQuorum is Quorum /\ \Dead,
  963      (   LifeQuorum /\ \(1<<Node) =:= 0
  964      ->  debug(paxos(replicate),
  965                'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...',
  966                [Node, Quorum, Dead]),
  967          thread_get_message(_)
  968      ;   (   paxos_replicate_key(LifeQuorum, Key, [])
  969          ->  replicated(State, key(Key)),
  970              thread_self(Me),
  971              thread_get_message(Me, _, [timeout(ReplSleep)])
  972          ;   replicated(State, idle),
  973              thread_get_message(_)
  974          )
  975      ),
  976      fail.
  977
  978replicated(State, key(_Key)) :-
  979    arg(1, State, idle),
  980    !,
  981    debug(paxos(replicate), 'Start replicating ...', []),
  982    nb_setarg(1, State, 1).
  983replicated(State, key(_Key)) :-
  984    !,
  985    arg(1, State, C0),
  986    C is C0+1,
  987    nb_setarg(1, State, C).
  988replicated(State, idle) :-
  989    arg(1, State, idle),
  990    !.
  991replicated(State, idle) :-
  992    arg(1, State, Count),
  993    debug(paxos(replicate), 'Replicated ~D keys', [Count]),
  994    nb_setarg(1, State, idle).
  995
  996
  997%!  paxos_replicate_key(+Nodes:bitmap, ?Key, +Options) is det.
  998%
  999%   Replicate a Key to Nodes.  If Key is unbound, a random key is
 1000%   selected.
 1001%
 1002%     - timeout(+Seconds)
 1003%     Max time to wait for the forum to reply.  Defaults to the
 1004%     _setting_ `response_timeout` (0.020, 20ms).
 1005
 1006paxos_replicate_key(Nodes, Key, Options) :-
 1007    replication_key(Nodes, Key),
 1008    option(timeout(TMO), Options, TMO),
 1009    apply_default(TMO, response_timeout),
 1010    ledger_current(Key, Gen, Value, Holders),
 1011    paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn),
 1012    collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes),
 1013    NewHolders is Holders \/ LearnedNodes,
 1014    paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned),
 1015    broadcast(Learned),
 1016    update_failed(replicate, Nodes, LearnedNodes).
 1017
 1018replication_key(_Nodes, Key) :-
 1019    ground(Key),
 1020    !.
 1021replication_key(Nodes, Key) :-
 1022    (   Nth is 1+random(popcount(Nodes))
 1023    ;   Nth = 1
 1024    ),
 1025    call_nth(needs_replicate(Nodes, Key), Nth),
 1026    !.
 1027
 1028needs_replicate(Nodes, Key) :-
 1029    ledger_current(Key, _Gen, _Value, Holders),
 1030    Nodes /\ \Holders =\= 0,
 1031    \+ paxos_admin_key(_, Key).
 1032
 1033
 1034		 /*******************************
 1035		 *      KEY CHANGE EVENTS	*
 1036		 *******************************/
 1037
 1038%!  paxos_on_change(?Term, :Goal) is det.
 1039%!  paxos_on_change(?Key, ?Value, :Goal) is det.
 1040%
 1041%   Executes the specified Goal  when   Key  changes.  paxos_on_change/2
 1042%   listens for paxos_changed(Key,Value) notifications   for  Key, which
 1043%   are emitted as the result   of  successful paxos_set/3 transactions.
 1044%   When one is received for Key, then   Goal  is executed in a separate
 1045%   thread of execution.
 1046%
 1047%   @arg Term is a compound, identical to that used for
 1048%   paxos_get/1.
 1049%   @arg Goal is one of:
 1050%     - a callable atom or term, or
 1051%     - the atom =ignore=, which causes monitoring for Term to be
 1052%       discontinued.
 1053
 1054paxos_on_change(Term, Goal) :-
 1055    paxos_key(Term, Key),
 1056    paxos_on_change(Key, Term, Goal).
 1057
 1058paxos_on_change(Key, Value, Goal) :-
 1059    Goal = _:Plain,
 1060    must_be(callable, Plain),
 1061    (   Plain == ignore
 1062    ->  unlisten(paxos_user, paxos_changed(Key,Value))
 1063    ;   listen(paxos_user, paxos_changed(Key,Value),
 1064               key_changed(Key, Value, Goal)),
 1065        paxos_initialize
 1066    ).
 1067
 1068key_changed(_Key, _Value, Goal) :-
 1069    E = error(_,_),
 1070    catch(thread_create(Goal, _, [detached(true)]),
 1071          E, key_error(E)).
 1072
 1073key_error(error(permission_error(create, thread, _), _)) :-
 1074    !.
 1075key_error(E) :-
 1076    print_message(error, E).
 1077
 1078
 1079		 /*******************************
 1080		 *            HOOKS		*
 1081		 *******************************/
 1082
 1083%!  node(-Node) is det.
 1084%
 1085%   Get the node ID for this paxos node.
 1086
 1087%!  quorum(-Quorum) is det.
 1088%
 1089%   Get the current quorum as a bitmask
 1090
 1091%!  paxos_message(+PaxOS, +TimeOut, -BroadcastMessage) is det.
 1092%
 1093%   Transform a basic PaxOS message in   a  message for the broadcasting
 1094%   service. This predicate is hooked   by paxos_message_hook/3 with the
 1095%   same signature.
 1096%
 1097%   @arg TimeOut is one of `-` or a time in seconds.
 1098
 1099paxos_message(Paxos:From, TMO, Message) :-
 1100    paxos_message_raw(paxos(Paxos):From, TMO, Message).
 1101paxos_message(Paxos, TMO, Message) :-
 1102    paxos_message_raw(paxos(Paxos), TMO, Message).
 1103
 1104paxos_message_raw(Message, TMO, WireMessage) :-
 1105    paxos_message_hook(Message, TMO, WireMessage),
 1106    !.
 1107paxos_message_raw(Message, TMO, WireMessage) :-
 1108    throw(error(mode_error(det, fail,
 1109                           paxos:paxos_message_hook(Message, TMO, WireMessage)), _)).
 1110
 1111
 1112		 /*******************************
 1113		 *           STORAGE		*
 1114		 *******************************/
 1115
 1116%!  paxos_ledger_hook(+Action, ?Key, ?Gen, ?Value, ?Holders)
 1117%
 1118%   Hook called for all operations on the ledger.  Defined actions are:
 1119%
 1120%     - current
 1121%       Enumerate our ledger content.
 1122%     - get
 1123%       Get a single value from our ledger.
 1124%     - create
 1125%       Create a new key in our ledger.
 1126%     - accept
 1127%       Accept a new newly proposed value for a key.  Failure causes
 1128%       the library to send a _NACK_ message.
 1129%     - set
 1130%       Final acceptance of Ken@Gen, providing the holders that accepted
 1131%       the new value.
 1132%     - learn
 1133%       Accept new keys in a new node or node that has been offline for
 1134%       some time.
 1135
 1136:- dynamic
 1137    paxons_ledger/4.                    % Key, Gen, Value, Holders
 1138
 1139%!  ledger_current(?Key, ?Gen, ?Value, ?Holders) is nondet.
 1140%
 1141%   True when Key is a known key in my ledger.
 1142
 1143ledger_current(Key, Gen, Value, Holders) :-
 1144    paxos_ledger_hook(current, Key, Gen, Value, Holders).
 1145ledger_current(Key, Gen, Value, Holders) :-
 1146    paxons_ledger(Key, Gen, Value, Holders),
 1147    valid(Holders).
 1148
 1149
 1150%!  ledger(+Key, -Gen, -Value) is semidet.
 1151%
 1152%   True if the ledger has Value associated  with Key at generation Gen.
 1153%   Note that if the value is  not   yet  acknowledged  by the leader we
 1154%   should not use it.
 1155
 1156ledger(Key, Gen, Value) :-
 1157    paxos_ledger_hook(get, Key, Gen, Value0, Holders),
 1158    !,
 1159    valid(Holders),
 1160    Value = Value0.
 1161ledger(Key, Gen, Value) :-
 1162    paxons_ledger(Key, Gen, Value0, Holders),
 1163    valid(Holders),
 1164    !,
 1165    Value = Value0.
 1166
 1167%!  ledger_create(+Key, +Gen, +Value) is det.
 1168%
 1169%   Create a new Key-Value pair  at   generation  Gen.  This is executed
 1170%   during the preparation phase.
 1171
 1172ledger_create(Key, Gen, Value) :-
 1173    paxos_ledger_hook(create, Key, Gen, Value, -),
 1174    !.
 1175ledger_create(Key, Gen, Value) :-
 1176    get_time(Now),
 1177    asserta(paxons_ledger(Key, Gen, Value, created(Now))).
 1178
 1179%!  ledger_update(+Key, +Gen, +Value) is semidet.
 1180%
 1181%   Update Key to Value if the  current   generation  is older than Gen.
 1182%   This reflects the accept phase of the protocol.
 1183
 1184ledger_update(Key, Gen, Value) :-
 1185    paxos_ledger_hook(accept, Key, Gen, Value, -),
 1186    !.
 1187ledger_update(Key, Gen, Value) :-
 1188    paxons_ledger(Key, Gen0, _Value, _Holders),
 1189    !,
 1190    Gen > Gen0,
 1191    get_time(Now),
 1192    asserta(paxons_ledger(Key, Gen, Value, accepted(Now))),
 1193    (   Gen0 == 0
 1194    ->  retractall(paxons_ledger(Key, Gen0, _, _))
 1195    ;   true
 1196    ).
 1197
 1198%!  ledger_update_holders(+Key, +Gen, +Holders) is det.
 1199%
 1200%   The leader acknowledged that Key@Gen represents a valid new
 1201
 1202ledger_update_holders(Key, Gen, Holders) :-
 1203    paxos_ledger_hook(set, Key, Gen, _, Holders),
 1204    !.
 1205ledger_update_holders(Key, Gen, Holders) :-
 1206    clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref),
 1207    !,
 1208    (   Holders0 == Holders
 1209    ->  true
 1210    ;   asserta(paxons_ledger(Key, Gen, Value, Holders)),
 1211        erase(Ref)
 1212    ),
 1213    clean_key(Holders0, Key, Gen).
 1214
 1215clean_key(Holders, _Key, _Gen) :-
 1216    valid(Holders),
 1217    !.
 1218clean_key(_, Key, Gen) :-
 1219    (   clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref),
 1220        Gen0 < Gen,
 1221        erase(Ref),
 1222        fail
 1223    ;   true
 1224    ).
 1225
 1226
 1227%!  ledger_learn(+Key,+Gen,+Value) is semidet.
 1228%
 1229%   We received a learn event.
 1230
 1231ledger_learn(Key,Gen,Value) :-
 1232    paxos_ledger_hook(learn, Key, Gen, Value, -),
 1233    !.
 1234ledger_learn(Key,Gen,Value) :-
 1235    paxons_ledger(Key, Gen0, Value0, _Holders),
 1236    !,
 1237    (   Gen == Gen0,
 1238        Value == Value0
 1239    ->  true
 1240    ;   Gen > Gen0
 1241    ->  get_time(Now),
 1242        asserta(paxons_ledger(Key, Gen, Value, learned(Now)))
 1243    ).
 1244ledger_learn(Key,Gen,Value) :-
 1245    get_time(Now),
 1246    asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
 1247
 1248%!  ledger_forget(+Nodes) is det.
 1249%
 1250%   Remove Nodes from all ledgers.  This is executed in a background
 1251%   thread.
 1252
 1253ledger_forget(Nodes) :-
 1254    catch(thread_create(ledger_forget_threaded(Nodes), _,
 1255                        [ detached(true)
 1256                        ]),
 1257          error(permission_error(create, thread, _), _),
 1258          true).
 1259
 1260ledger_forget_threaded(Nodes) :-
 1261    debug(paxos(node), 'Forgetting 0x~16r', [Nodes]),
 1262    forall(ledger_current(Key, Gen, _Value, Holders),
 1263           ledger_forget(Nodes, Key, Gen, Holders)),
 1264    debug(paxos(node), 'Forgotten 0x~16r', [Nodes]).
 1265
 1266ledger_forget(Nodes, Key, Gen, Holders) :-
 1267    NewHolders is Holders /\ \Nodes,
 1268    (   NewHolders \== Holders,
 1269        ledger_update_holders(Key, Gen, NewHolders)
 1270    ->  true
 1271    ;   true
 1272    ).
 1273
 1274valid(Holders) :-
 1275    integer(Holders).
 1276
 1277
 1278		 /*******************************
 1279		 *             UTIL		*
 1280		 *******************************/
 1281
 1282%!  c_element(+NewList, +Old, -Value)
 1283%
 1284%   A Muller c-element is a logic block  used in asynchronous logic. Its
 1285%   output assumes the value of its  input   iff  all  of its inputs are
 1286%   identical. Otherwise, the output retains its original value.
 1287
 1288c_element([New | More], _Old, New) :-
 1289    forall(member(N, More), N == New),
 1290    !.
 1291c_element(_List, Old, Old).
 1292
 1293%!  arg_union(+Arg, +ListOfTerms, -Set) is det.
 1294%
 1295%   Get all the nth args from ListOfTerms  and   do  a  set union on the
 1296%   result.
 1297
 1298arg_union(Arg, NodeStatusList, Set) :-
 1299    maplist(arg(Arg), NodeStatusList, Sets),
 1300    list_union(Sets, Set).
 1301
 1302list_union(Sets, Set) :-
 1303    list_union(Sets, 0, Set).
 1304
 1305list_union([], Set, Set).
 1306list_union([H|T], Set0, Set) :-
 1307    Set1 is Set0 \/ H,
 1308    list_union(T, Set1, Set)