View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald, Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2019, Jeffrey Rosenwald
    7                   CWI, Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(paxos,
   37          [ paxos_get/1,                        % ?Term
   38            paxos_get/2,                        % +Key, -Value
   39            paxos_get/3,                        % +Key, -Value, +Options
   40            paxos_set/1,                        % ?Term
   41            paxos_set/2,                        % +Key, +Value
   42            paxos_set/3,                        % +Key, +Value, +Options
   43            paxos_on_change/2,                  % ?Term, +Goal
   44            paxos_on_change/3,                  % ?Key, ?Value, +Goal
   45
   46            paxos_initialize/1,			% +Options
   47
   48            paxos_admin_key/2,                  % ?Name, ?Key
   49            paxos_property/1,                   % ?Property
   50            paxos_quorum_ask/4,                 % ?Templ, +Msg, -Result, +Options
   51                                                % Hook support
   52            paxos_replicate_key/3               % +Nodes, ?Key, +Options
   53          ]).   54:- use_module(library(broadcast)).   55:- use_module(library(debug)).   56:- use_module(library(lists)).   57:- use_module(library(settings)).   58:- use_module(library(option)).   59:- use_module(library(error)).   60:- use_module(library(apply)).   61:- use_module(library(solution_sequences)).

A Replicated Data Store

This module provides a replicated data store that is coordinated using a variation on Lamport's Paxos concensus protocol. The original method is described in his paper entitled, "The Part-time Parliament", which was published in 1998. The algorithm is tolerant of non-Byzantine failure. That is late or lost delivery or reply, but not senseless delivery or reply. The present algorithm takes advantage of the convenience offered by multicast to the quorum's membership, who can remain anonymous and who can come and go as they please without effecting Liveness or Safety properties.

Paxos' quorum is a set of one or more attentive members, whose processes respond to queries within some known time limit (< 20ms), which includes roundtrip delivery delay. This property is easy to satisfy given that every coordinator is necessarily a member of the quorum as well, and a quorum of one is permitted. An inattentive member (e.g. one whose actions are late or lost) is deemed to be "not-present" for the purposes of the present transaction and consistency cannot be assured for that member. As long as there is at least one attentive member of the quorum, then persistence of the database is assured.

Each member maintains a ledger of terms along with information about when they were originally recorded. The member's ledger is deterministic. That is to say that there can only be one entry per functor/arity combination. No member will accept a new term proposal that has a line number that is equal-to or lower-than the one that is already recorded in the ledger.

Paxos is a three-phase protocol:

1: A coordinator first prepares the quorum for a new proposal by broadcasting a proposed term. The quorum responds by returning the last known line number for that functor/arity combination that is recorded in their respective ledgers.
2: The coordinator selects the highest line number it receives, increments it by one, and then asks the quorum to finally accept the new term with the new line number. The quorum checks their respective ledgers once again and if there is still no other ledger entry for that functor/arity combination that is equal-to or higher than the specified line, then each member records the term in the ledger at the specified line. The member indicates consent by returning the specified line number back to the coordinator. If consent is withheld by a member, then the member returns a nack instead. The coordinator requires unanimous consent. If it isn't achieved then the proposal fails and the coordinator must start over from the beginning.
3: Finally, the coordinator concludes the successful negotiation by broadcasting the agreement to the quorum in the form of a paxos_changed(Key,Value) event. This is the only event that should be of interest to user programs.

For practical reasons, we rely on the partially synchronous behavior (e.g. limited upper time bound for replies) of broadcast_request/1 over TIPC to ensure Progress. Perhaps more importantly, we rely on the fact that the TIPC broadcast listener state machine guarantees the atomicity of broadcast_request/1 at the process level, thus obviating the need for external mutual exclusion mechanisms.

Note that this algorithm does not guarantee the rightness of the value proposed. It only guarantees that if successful, the value proposed is identical for all attentive members of the quorum.

author
- Jeffrey Rosenwald (JeffRose@acm.org)
See also
- tipc_broadcast.pl, udp_broadcast.pl */
license
- BSD-2
  133:- meta_predicate
  134    paxos_on_change(?, 0),
  135    paxos_on_change(?, ?, 0).  136
  137:- multifile
  138    paxos_message_hook/3,               % +PaxOS, +TimeOut, -Message
  139    paxos_ledger_hook/5.                % +Op, ?Key, ?Gen, ?Value, ?Status
  140
  141:- setting(max_sets, nonneg, 20,
  142           "Max Retries to get to an agreement").  143:- setting(max_gets, nonneg, 5,
  144           "Max Retries to get a value from the forum").  145:- setting(response_timeout, float, 0.020,
  146           "Max time to wait for a response").  147:- setting(replication_rate, number, 1000,
  148           "Number of keys replicated per second").  149:- setting(death_half_life, number, 10,
  150           "Half-time for failure score").  151:- setting(death_score, number, 100,
  152           "Consider a node dead if cummulative failure \c
  153            score exceeds this number").
 paxos_initialize(+Options) is det
Initialize this Prolog process as a paxos node. The initialization requires an initialized and configured TIPC, UDP or other broadcast protocol. Calling this initialization may be omitted, in which case the equivant of paxos_initialize([]) is executed lazily as part of the first paxos operation. Defined options:
node(?NodeID)
When instantiated, this node rejoins the network with the given node id. A fixed node idea should be used if the node is configured for persistency and causes the new node to receive updates for keys that have been created or modified since the node left the network. If NodeID is a variable it is unified with the discovered NodeID.

NodeID must be a small non-negative integer as these identifiers are used in bitmaps.

  175:- dynamic  paxos_initialized/0.  176:- volatile paxos_initialized/0.  177
  178paxos_initialize(_Options) :-
  179    paxos_initialized,
  180    !.
  181paxos_initialize(Options) :-
  182    with_mutex(paxos, paxos_initialize_sync(Options)).
  183
  184paxos_initialize_sync(_Options) :-
  185    paxos_initialized,
  186    !.
  187paxos_initialize_sync(Options) :-
  188    at_halt(paxos_leave),
  189    listen(paxos, paxos(X), paxos_message(X)),
  190    paxos_assign_node(Options),
  191    start_replicator,
  192    asserta(paxos_initialized).
  193
  194paxos_initialize :-
  195    paxos_initialize([]).
  196
  197
  198		 /*******************************
  199		 *            ADMIN		*
  200		 *******************************/
 paxos_get_admin(+Name, -Value) is semidet
 paxos_set_admin(+Name, +Value) is semidet
Set administrative keys. We use a wrapper such that we can hide the key identity.
  208paxos_admin_key(quorum, '$paxos_quorum').
  209paxos_admin_key(dead,   '$paxos_dead_nodes').
  210
  211paxos_get_admin(Name, Value) :-
  212    paxos_admin_key(Name, Key),
  213    paxos_get(Key, Value).
  214
  215paxos_set_admin(Name, Value) :-
  216    paxos_admin_key(Name, Key),
  217    paxos_set(Key, Value).
  218
  219paxos_set_admin_bg(Name, Value) :-
  220    thread_create(ignore(paxos_set_admin(Name, Value)), _,
  221                  [ detached(true)
  222                  ]).
  223
  224
  225		 /*******************************
  226		 *           NODE DATA		*
  227		 *******************************/
 node(?NodeId)
 quorum(?Bitmap)
 dead(?Bitmap)
 failed(?Bitmap)
 failed(?NodeId, ?LastTried, ?Score)
Track our identity as well as as the status of our peers in the network. NodeId is a small integer. Multiple NodeIds are combined in a Bitmap.
  247:- dynamic
  248    node/1,                             % NodeID
  249    quorum/1,                           % Bitmap
  250    failed/1,                           % Bitmap
  251    failed/3,                           % NodeID, LastTried, Score
  252    leaving/0,                          % Node is leaving
  253    dead/1,                             % Bitmap
  254    salt/1.                             % Unique key
  255:- volatile
  256    node/1,
  257    quorum/1,
  258    failed/1,
  259    failed/3,
  260    leaving/0,
  261    dead/1,
  262    salt/1.
 paxos_assign_node(+Options) is det
Assign a node for this paxos instance. If node is given as an option, this is the node id that is used. Otherwise the network is analysed and the system selects a new node.
  270paxos_assign_node(Options) :-
  271    (   option(node(Node), Options)
  272    ->  node(Node)
  273    ;   node(_)
  274    ),                                          % already done
  275    !.
  276paxos_assign_node(Options) :-
  277    between(1, 20, Retry),
  278    option(node(Node), Options, Node),
  279    (   node(_)
  280    ->  permission_error(set, paxos_node, Node)
  281    ;   true
  282    ),
  283    retractall(dead(_)),
  284    retractall(quorum(_)),
  285    retractall(failed(_)),
  286    retractall(failed(_,_,_)),
  287    retractall(leaving),
  288    Salt is random(1<<63),
  289    asserta(salt(Salt)),
  290    paxos_message(node(N,Q,D):From, 0.25, NodeQuery),
  291    findall(t(N,Q,D,From),
  292            broadcast_request(NodeQuery),
  293            Network),
  294    select(t(self,0,Salt,Me), Network, AllNodeStatus),
  295    partition(starting, AllNodeStatus, Starting, Running),
  296    nth_starting(Starting, Salt, Offset),
  297    retractall(salt(_)),
  298    debug(paxos(node), 'Me@~p; starting: ~p; running: ~p',
  299          [Me, Starting, Running]),
  300    arg_union(2, Running, Quorum),
  301    arg_union(3, Running, Dead),
  302    (   var(Node)
  303    ->  (   call_nth(( between(0, 1000, Node),
  304                       \+ memberchk(t(Node,_,_,_), Running),
  305                       Dead /\ (1<<Node) =:= 0),
  306                     Offset)
  307        ->  debug(paxos(node), 'Assigning myself node ~d', [Node])
  308        ;   resource_error(paxos_nodes)
  309        )
  310    ;   memberchk(t(Node,_,_,_), Running)
  311    ->  permission_error(set, paxos_node, Node)
  312    ;   Rejoin = true
  313    ),
  314    asserta(node(Node)),
  315    (   claim_node(Node, Me)
  316    ->  !,
  317        asserta(dead(Dead)),
  318        set_quorum(Node, Quorum),
  319        (   Rejoin == true
  320        ->  paxos_rejoin
  321        ;   true
  322        )
  323    ;   debug(paxos(node), 'Node already claimed; retrying (~p)', [Node, Retry]),
  324        retractall(node(Node)),
  325        fail
  326    ).
  327
  328starting(t(self,_Quorum,_Salt,_Address)).
  329
  330nth_starting(Starting, Salt, N) :-
  331    maplist(arg(3), Starting, Salts),
  332    sort([Salt|Salts], Sorted),
  333    nth1(N, Sorted, Salt),
  334    !.
  335
  336claim_node(Node, Me) :-
  337    paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery),
  338    forall((   broadcast_request(NodeQuery),
  339               From \== Me,
  340               debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok])
  341           ),
  342           Ok == true).
  343
  344set_quorum(Node, Quorum0) :-
  345    Quorum is Quorum0 \/ (1<<Node),
  346    debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]),
  347    asserta(quorum(Quorum)),
  348    paxos_set_admin(quorum, Quorum).
 paxos_rejoin
Re-join the network. Tasks:
  358paxos_rejoin :-
  359    node(Node),
  360    repeat,
  361        (   paxos_get_admin(dead, Dead0)
  362        ->  Dead is Dead0 /\ \(1<<Node),
  363            (   Dead == Dead0
  364            ->  true
  365            ;   paxos_set_admin(dead, Dead)
  366            )
  367        ;   true
  368        ),
  369    !.
 paxos_leave is det
 paxos_leave(+Node) is det
Leave the network. The predicate paxos_leave/0 is called from at_halt/1 to ensure the node is deleted as the process dies. The paxos_leave/1 version is called to discard other nodes if they repeatedly did not respond to queries.
  379paxos_leave :-
  380    node(Node),
  381    !,
  382    asserta(leaving),
  383    paxos_leave(Node),
  384    Set is 1<<Node,
  385    paxos_message(forget(Set), -, Forget),
  386    broadcast(Forget),
  387    unlisten(paxos),
  388    retractall(leaving).
  389paxos_leave.
  390
  391paxos_leave(Node) :-
  392    !,
  393    paxos_update_set(quorum, del(Node)),
  394    paxos_update_set(dead,   add(Node)).
  395paxos_leave(_).
  396
  397paxos_update_set(Set, How) :-
  398    repeat,
  399      Term =.. [Set,Value],
  400      call(Term),
  401      (   How = add(Node)
  402      ->  NewValue is Value \/  (1<<Node)
  403      ;   How = del(Node)
  404      ->  NewValue is Value /\ \(1<<Node)
  405      ),
  406      (   Value == NewValue
  407      ->  true
  408      ;   paxos_set_admin(Set, NewValue)
  409      ->  true
  410      ;   leaving
  411      ),
  412    !.
  413
  414		 /*******************************
  415		 *          NODE STATUS		*
  416		 *******************************/
 update_failed(+Action, +Quorum, +Alive) is det
We just sent the Quorum a message and got a reply from the set Alive.
Arguments:
is- one of set, get or replicate and indicates the intended action.
  426update_failed(Action, Quorum, Alive) :-
  427    Failed is Quorum /\ \Alive,
  428    alive(Alive),
  429    consider_dead(Failed),
  430    (   failed(Failed)
  431    ->  true
  432    ;   (   clause(failed(_Old), true, Ref)
  433        ->  asserta(failed(Failed)),
  434            erase(Ref),
  435            debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed])
  436        ;   asserta(failed(Failed))
  437        ),
  438        (   Action == set
  439        ->  start_replicator
  440        ;   true
  441        )
  442    ).
  443
  444consider_dead(0) :-
  445    !.
  446consider_dead(Failed) :-
  447    Node is lsb(Failed),
  448    consider_dead1(Node),
  449    Rest is Failed /\ \(1<<Node),
  450    consider_dead(Rest).
  451
  452consider_dead1(Node) :-
  453    clause(failed(Node, Last, Score), true, Ref),
  454    !,
  455    setting(death_half_life, HalfLife),
  456    setting(death_score, DeathScore),
  457    get_time(Now),
  458    Passed is Now-Last,
  459    NewScore is Score*(2**(-Passed/HalfLife)) + 10,
  460    asserta(failed(Node, Now, NewScore)),
  461    erase(Ref),
  462    (   NewScore < DeathScore
  463    ->  debug(paxos(node), 'Consider node ~d dead', [Node]),
  464        paxos_leave(Node)
  465    ;   true
  466    ).
  467consider_dead1(Node) :-
  468    get_time(Now),
  469    asserta(failed(Node, Now, 10)).
  470
  471alive(Bitmap) :-
  472    (   clause(failed(Node, _Last, _Score), true, Ref),
  473        Bitmap /\ (1<<Node) =\= 0,
  474        erase(Ref),
  475        fail
  476    ;   true
  477    ).
 life_quorum(-Quorum, -LifeQuorum) is det
Find the Quorum and the living nodes from the Quorum. This is the set for which we wait. If the LifeQuorum is not a majority we address the whole Quorum.
To be done
- At some point in time we must remove a node from the quorum.
  488life_quorum(Quorum, LifeQuorum) :-
  489    quorum(Quorum),
  490    (   failed(Failed),
  491        Failed \== 0,
  492        LifeQuorum is Quorum /\ \Failed,
  493        majority(LifeQuorum, Quorum)
  494    ->  true
  495    ;   LifeQuorum = Quorum
  496    ).
  497
  498
  499		 /*******************************
  500		 *        NETWORK STATUS	*
  501		 *******************************/
  502
  503:- paxos_admin_key(quorum, Key),
  504   listen(paxos_changed(Key, Quorum),
  505          update_quorum(Quorum)).  506:- paxos_admin_key(dead, Key),
  507   listen(paxos_changed(Key, Death),
  508          update_dead(Death)).  509
  510update_quorum(Proposed) :-
  511    debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]),
  512    quorum(Proposed),
  513    !.
  514update_quorum(Proposed) :-
  515    leaving,
  516    !,
  517    update(quorum(Proposed)).
  518update_quorum(Proposed) :-
  519    node(Node),
  520    Proposed /\ (1<<Node) =\= 0,
  521    !,
  522    update(quorum(Proposed)).
  523update_quorum(Proposed) :-
  524    node(Node),
  525    NewQuorum is Proposed \/ (1<<Node),
  526    update(quorum(NewQuorum)),
  527    debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]),
  528    paxos_set_admin_bg(quorum, NewQuorum).
  529
  530update_dead(Proposed) :-
  531    debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]),
  532    dead(Proposed),
  533    !.
  534update_dead(Proposed) :-
  535    leaving,
  536    !,
  537    update(dead(Proposed)).
  538update_dead(Proposed) :-
  539    node(Node),
  540    Proposed /\ (1<<Node) =:= 0,
  541    !,
  542    update(dead(Proposed)).
  543update_dead(Proposed) :-
  544    node(Node),
  545    NewDead is Proposed /\ \(1<<Node),
  546    update(dead(NewDead)),
  547    paxos_set_admin_bg(dead, NewDead).
  548
  549update(Clause) :-
  550    functor(Clause, Name, Arity),
  551    functor(Generic, Name, Arity),
  552    (   clause(Generic, true, Ref)
  553    ->  asserta(Clause),
  554        erase(Ref)
  555    ;   asserta(Clause)
  556    ).
 paxos_property(?Property)
True if Property is a current property for the paxos network. Defined properties are:
  567paxos_property(node(NodeID)) :-
  568    node(NodeID).
  569paxos_property(quorum(Quorum)) :-
  570    quorum(Quorum).
  571paxos_property(failed(Nodes)) :-
  572    failed(Nodes).
  573
  574
  575		 /*******************************
  576		 *         INBOUND EVENTS	*
  577		 *******************************/
 paxos_message(?Message)
Handle inbound actions from our peers. Defines values for Message are:
prepare(+Key, -Node, -Gen, +Value)
A request message to set Key to Value. Returns the current generation at which we have a value or 0 for Gen and the our node id for Node.
accept(+Key, -Node, +Gen, -GenA, +Value)
A request message to set Key to Value if Gen is newer than the generation we have for Key. In that case GenA is Gen. Otherwise we reject using GenA = nack.
changed(+Key, +Gen, +Value, +Acceptors)
The leader got enough accepts for setting Key to Value at Gen. Acceptors is the set of nodes that accepted this value.
learn(+Key, -Node, +Gen, -GenA, +Value)
Request message peforming phase one for replication to learner nodes.
learned(+Key, +Gen, +Value, +Acceptors)
Phase two of the replication. Confirm the newly learned knowledge.
retrieve(+Key, -Node, -Gen, -Value)
A request message to retrieve our value for Key. Also provides our node id and the generation.
forget(+Nodes)
Forget the existence of Nodes.
node(-Node, -Quorum, -Dead)
Get my view about the network. Node is the (integer) node id of this node, Quorum is the idea of the quorum and Dead is the idea about non-responsive nodes.
To be done
- : originally the changed was handled by a get and when not successful with a new set, named paxos_audit. I don't really see why we need this.
  614paxos_message(prepare(Key,Node,Gen,Value)) :-
  615    node(Node),
  616    (   ledger(Key, Gen, _)
  617    ->  true
  618    ;   Gen = 0,
  619        ledger_create(Key, Gen, Value)
  620    ),
  621    debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]).
  622paxos_message(accept(Key,Node,Gen,GenA,Value)) :-
  623    node(Node),
  624    (   ledger_update(Key, Gen, Value)
  625    ->  debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]),
  626        GenA = Gen
  627    ;   debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]),
  628        GenA = nack
  629    ).
  630paxos_message(changed(Key,Gen,Value,Acceptors)) :-
  631    debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]),
  632    ledger_update_holders(Key,Gen,Acceptors),
  633    broadcast(paxos_changed(Key,Value)).
  634paxos_message(learn(Key,Node,Gen,GenA,Value)) :-
  635    node(Node),
  636    debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]),
  637    (   ledger_learn(Key,Gen,Value)
  638    ->  debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]),
  639        GenA = Gen
  640    ;   debug(paxos, 'Rejected ~p@~d', [Key, Gen]),
  641        GenA = nack
  642    ).
  643paxos_message(learned(Key,Gen,_Value,Acceptors)) :-
  644    ledger_update_holders(Key,Gen,Acceptors).
  645paxos_message(retrieve(Key,Node,K,Value)) :-
  646    node(Node),
  647    debug(paxos, 'Retrieving ~p', [Key]),
  648    ledger(Key,K,Value),
  649    debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]),
  650    !.
  651paxos_message(forget(Nodes)) :-
  652    ledger_forget(Nodes).
  653paxos_message(node(Node,Quorum,Dead)) :-
  654    (   node(Node),
  655        quorum(Quorum),
  656        dead(Dead)
  657    ->  true
  658    ;   salt(Salt),
  659        Node = self,
  660        Quorum = 0,
  661        Dead = Salt
  662    ).
  663paxos_message(claim_node(Node, Ok)) :-
  664    (   node(Node)
  665    ->  Ok = false
  666    ;   Ok = true
  667    ).
  668paxos_message(ask(Node, Message)) :-
  669    node(Node),
  670    broadcast_request(Message).
  671
  672
  673		 /*******************************
  674		 *     KEY-VALUE OPERATIONS	*
  675		 *******************************/
 paxos_set(+Term) is semidet
Equivalent to paxos_key(Term,Key), pasox_set(Key,Term). I.e., Term is a ground compound term and its key is the name/arity pair. This version provides compatibility with older versions of this library.
 paxos_set(+Key, +Value) is semidet
 paxos_set(+Key, +Value, +Options) is semidet
negotiates to have Key-Value recorded in the ledger for each of the quorum's members. This predicate succeeds if the quorum unanimously accepts the proposed term. If no such entry exists in the Paxon's ledger, then one is silently created. paxos_set/1 will retry the transaction several times (default: 20) before failing. Failure is rare and is usually the result of a collision of two or more writers writing to the same term at precisely the same time. On failure, it may be useful to wait some random period of time, and then retry the transaction. By specifying a retry count of zero, paxos_set/2 will succeed iff the first ballot succeeds.

On success, paxos_set/1 will also broadcast the term paxos_changed(Key,Value), to the quorum.

Options processed:

retry(Retries)
is a non-negative integer specifying the number of retries that will be performed before a set is abandoned. Defaults to the setting max_sets (20).
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
Arguments:
Term- is a compound that may have unbound variables.
To be done
- If the Value is already current, should we simply do nothing?
  713paxos_set(Term) :-
  714    paxos_key(Term, Key),
  715    paxos_set(Key, Term, []).
  716
  717paxos_set(Key, Value) :-
  718    paxos_set(Key, Value, []).
  719
  720paxos_set(Key, Value, Options) :-
  721    must_be(ground, Key-Value),
  722    paxos_initialize,
  723    option(retry(Retries), Options, Retries),
  724    option(timeout(TMO), Options, TMO),
  725    apply_default(Retries, max_sets),
  726    apply_default(TMO, response_timeout),
  727    paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare),
  728    between(0, Retries, _),
  729      life_quorum(Quorum, Alive),
  730      Alive \== 0,
  731      debug(paxos, 'Set: ~p -> ~p', [Key, Value]),
  732      collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes),
  733      debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p',
  734            [Quorum, PrepNodes, Rps]),
  735      majority(PrepNodes, Quorum),
  736      max_list(Rps, K),
  737      succ(K, K1),
  738      paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept),
  739      collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes),
  740      majority(AcceptNodes, Quorum),
  741      intersecting(PrepNodes, AcceptNodes),
  742      c_element(Ras, K, K1),
  743      broadcast(paxos(log(Key,Value,AcceptNodes,K1))),
  744      paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed),
  745      broadcast(Changed),
  746      update_failed(set, Quorum, AcceptNodes),
  747    !.
  748
  749apply_default(Var, Setting) :-
  750    var(Var),
  751    !,
  752    setting(Setting, Var).
  753apply_default(_, _).
  754
  755majority(SubSet, Set) :-
  756    popcount(SubSet) >= (popcount(Set)+2)//2.
  757
  758intersecting(Set1, Set2) :-
  759    Set1 /\ Set2 =\= 0.
 collect(+Quorum, :Stop, ?Node, ?Template, ?Message, -Result, -NodeSet) is semidet
Perform a broadcast request using Message. Node and Template share with Message and extract the replying node and the result value from Message. Result is the list of instantiations for Template received and NodeSet is the set (bitmask) of Node values that replies, i.e. |NodeSet| is length(Result). The transfer stops if all members of the set Quorum responded or the configured timeout passed.
  772collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
  773    State = state(0),
  774    L0 = [dummy|_],
  775    Answers = list(L0),
  776    (   broadcast_request(Message),
  777        debug(paxos(request), 'broadcast_request: ~p', [Message]),
  778        (   Stop
  779        ->  !,
  780            fail
  781        ;   true
  782        ),
  783        duplicate_term(Template, Copy),
  784        NewLastCell = [Copy|_],
  785        arg(1, Answers, LastCell),
  786        nb_linkarg(2, LastCell, NewLastCell),
  787        nb_linkarg(1, Answers, NewLastCell),
  788        arg(1, State, Replied0),
  789        Replied is Replied0 \/ (1<<Node),
  790        nb_setarg(1, State, Replied),
  791        Quorum /\ Replied =:= Quorum
  792    ->  true
  793    ;   true
  794    ),
  795    arg(1, State, NodeSet),
  796    arg(1, Answers, [_]),               % close the answer list
  797    L0 = [_|Result].
 paxos_quorum_ask(?Template, +Message, -Result, +Options)
Ask the paxos forum for their opinion. This predicate is not really part of the paxos protocol, but reuses notably the quorum maintenance mechanism of this library for asking questions to the quorum (cluster). Message is the message to be asked. Result is a list of copies of Template from the quorum. Options:
timeout(+Seconds)
Max time to wait for a reply. Default is the setting response_timeout.
node(?Node)
Can be used to include the replying node into Template.
quorum(?Quorum)
Set/query the interviewed quorum as a bitmask
  815paxos_quorum_ask(Template, Message, Result, Options) :-
  816    option(timeout(TMO), Options, TMO),
  817    option(node(Node), Options, _),
  818    option(quorum(Quorum), Options, Quorum),
  819    apply_default(TMO, response_timeout),
  820    (   var(Quorum)
  821    ->  life_quorum(Quorum, _Alive)
  822    ;   true
  823    ),
  824    paxos_message(ask(Node, Message), TMO, BroadcastMessage),
  825    collect(Quorum, false, Node, Template, BroadcastMessage, Result, _PrepNodes).
 paxos_get(?Term) is semidet
Equivalent to paxos_key(Term,Key), pasox_get(Key,Term). I.e., Term is a compound term and its key is the name/arity pair. This version provides compatibility with older versions of this library.
 paxos_get(+Key, +Value) is semidet
 paxos_get(+Key, +Value, +Options) is semidet
unifies Term with the entry retrieved from the Paxon's ledger. If no such entry exists in the member's local cache, then the quorum is asked to provide a value, which is verified for consistency. An implied paxos_set/1 follows. This predicate succeeds if a term with the same functor and arity exists in the Paxon's ledger, and fails otherwise.

Options processed:

retry(Retries)
is a non-negative integer specifying the number of retries that will be performed before a set is abandoned. Defaults to the setting max_gets (5).
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
Arguments:
Term- is a compound. Any unbound variables are unified with those provided in the ledger entry.
  856paxos_get(Term) :-
  857    paxos_key(Term, Key),
  858    paxos_get(Key, Term, []).
  859paxos_get(Key, Value) :-
  860    paxos_get(Key, Value, []).
  861
  862paxos_get(Key, Value, _) :-
  863    ledger(Key, _Line, Value),
  864    !.
  865paxos_get(Key, Value, Options) :-
  866    paxos_initialize,
  867    option(retry(Retries), Options, Retries),
  868    option(timeout(TMO), Options, TMO),
  869    apply_default(Retries, max_gets),
  870    apply_default(TMO, response_timeout),
  871    Msg = Line-Value,
  872    paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve),
  873    node(Node),
  874    between(0, Retries, _),
  875      life_quorum(Quorum, Alive),
  876      QuorumA is Alive /\ \(1<<Node),
  877      collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes),
  878      debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]),
  879      highest_vote(Terms, _Line-MajorityValue, Count),
  880      debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]),
  881      Count >= (popcount(QuorumA)+2)//2,
  882      debug(paxos, 'Retrieve: accept ~p', [MajorityValue]),
  883      update_failed(get, Quorum, RetrievedNodes),
  884      paxos_set(Key, MajorityValue),    % Is this needed?
  885    !.
  886
  887highest_vote(Terms, Term, Count) :-
  888    msort(Terms, Sorted),
  889    count_votes(Sorted, Counted),
  890    sort(1, >, Counted, [Count-Term|_]).
  891
  892count_votes([], []).
  893count_votes([H|T0], [N-H|T]) :-
  894    count_same(H, T0, 1, N, R),
  895    count_votes(R, T).
  896
  897count_same(H, [Hc|T0], C0, C, R) :-
  898    H == Hc,
  899    !,
  900    C1 is C0+1,
  901    count_same(H, T0, C1, C, R).
  902count_same(_, R, C, C, R).
 paxos_key(+Term, -Key) is det
Compatibility to allow for paxos_get/1, paxos_set/1, etc. The key of a compound term is a term '$c'(Name,Arity). Note that we do not use Name/Arity and X/Y is naturally used to organize keys as hierachical paths.
  911paxos_key(Compound, '$c'(Name,Arity)) :-
  912    compound(Compound), !,
  913    compound_name_arity(Compound, Name, Arity).
  914paxos_key(Compound, _) :-
  915    must_be(compound, Compound).
  916
  917
  918		 /*******************************
  919		 *          REPLICATION		*
  920		 *******************************/
 start_replicator
Start or signal the replicator thread that there may be outstanding replication work. This is the case if
  931start_replicator :-
  932    catch(thread_send_message(paxos_replicator, run),
  933          error(existence_error(_,_),_),
  934          fail),
  935    !.
  936start_replicator :-
  937    catch(thread_create(replicator, _,
  938                        [ alias(paxos_replicator),
  939                          detached(true)
  940                        ]),
  941          error(permission_error(_,_,_),_),
  942          true).
  943
  944replicator :-
  945    setting(replication_rate, ReplRate),
  946    ReplSleep is 1/ReplRate,
  947    node(Node),
  948    debug(paxos(replicate), 'Starting replicator', []),
  949    State = state(idle),
  950    repeat,
  951      quorum(Quorum),
  952      dead(Dead),
  953      LifeQuorum is Quorum /\ \Dead,
  954      (   LifeQuorum /\ \(1<<Node) =:= 0
  955      ->  debug(paxos(replicate),
  956                'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...',
  957                [Node, Quorum, Dead]),
  958          thread_get_message(_)
  959      ;   (   paxos_replicate_key(LifeQuorum, Key, [])
  960          ->  replicated(State, key(Key)),
  961              thread_self(Me),
  962              thread_get_message(Me, _, [timeout(ReplSleep)])
  963          ;   replicated(State, idle),
  964              thread_get_message(_)
  965          )
  966      ),
  967      fail.
  968
  969replicated(State, key(_Key)) :-
  970    arg(1, State, idle),
  971    !,
  972    debug(paxos(replicate), 'Start replicating ...', []),
  973    nb_setarg(1, State, 1).
  974replicated(State, key(_Key)) :-
  975    !,
  976    arg(1, State, C0),
  977    C is C0+1,
  978    nb_setarg(1, State, C).
  979replicated(State, idle) :-
  980    arg(1, State, idle),
  981    !.
  982replicated(State, idle) :-
  983    arg(1, State, Count),
  984    debug(paxos(replicate), 'Replicated ~D keys', [Count]),
  985    nb_setarg(1, State, idle).
 paxos_replicate_key(+Nodes:bitmap, ?Key, +Options) is det
Replicate a Key to Nodes. If Key is unbound, a random key is selected.
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
  997paxos_replicate_key(Nodes, Key, Options) :-
  998    replication_key(Nodes, Key),
  999    option(timeout(TMO), Options, TMO),
 1000    apply_default(TMO, response_timeout),
 1001    ledger_current(Key, Gen, Value, Holders),
 1002    paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn),
 1003    collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes),
 1004    NewHolders is Holders \/ LearnedNodes,
 1005    paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned),
 1006    broadcast(Learned),
 1007    update_failed(replicate, Nodes, LearnedNodes).
 1008
 1009replication_key(_Nodes, Key) :-
 1010    ground(Key),
 1011    !.
 1012replication_key(Nodes, Key) :-
 1013    (   Nth is 1+random(popcount(Nodes))
 1014    ;   Nth = 1
 1015    ),
 1016    call_nth(needs_replicate(Nodes, Key), Nth),
 1017    !.
 1018
 1019needs_replicate(Nodes, Key) :-
 1020    ledger_current(Key, _Gen, _Value, Holders),
 1021    Nodes /\ \Holders =\= 0,
 1022    \+ paxos_admin_key(_, Key).
 1023
 1024
 1025		 /*******************************
 1026		 *      KEY CHANGE EVENTS	*
 1027		 *******************************/
 paxos_on_change(?Term, :Goal) is det
 paxos_on_change(?Key, ?Value, :Goal) is det
Executes the specified Goal when Key changes. paxos_on_change/2 listens for paxos_changed(Key,Value) notifications for Key, which are emitted as the result of successful paxos_set/3 transactions. When one is received for Key, then Goal is executed in a separate thread of execution.
Arguments:
Term- is a compound, identical to that used for paxos_get/1.
Goal- is one of:
  • a callable atom or term, or
  • the atom ignore, which causes monitoring for Term to be discontinued.
 1045paxos_on_change(Term, Goal) :-
 1046    paxos_key(Term, Key),
 1047    paxos_on_change(Key, Term, Goal).
 1048
 1049paxos_on_change(Key, Value, Goal) :-
 1050    Goal = _:Plain,
 1051    must_be(callable, Plain),
 1052    (   Plain == ignore
 1053    ->  unlisten(paxos_user, paxos_changed(Key,Value))
 1054    ;   listen(paxos_user, paxos_changed(Key,Value),
 1055               key_changed(Key, Value, Goal)),
 1056        paxos_initialize
 1057    ).
 1058
 1059key_changed(_Key, _Value, Goal) :-
 1060    E = error(_,_),
 1061    catch(thread_create(Goal, _, [detached(true)]),
 1062          E, key_error(E)).
 1063
 1064key_error(error(permission_error(create, thread, _), _)) :-
 1065    !.
 1066key_error(E) :-
 1067    print_message(error, E).
 1068
 1069
 1070		 /*******************************
 1071		 *            HOOKS		*
 1072		 *******************************/
 node(-Node) is det
Get the node ID for this paxos node.
 quorum(-Quorum) is det
Get the current quorum as a bitmask
 paxos_message(+PaxOS, +TimeOut, -BroadcastMessage) is det
Transform a basic PaxOS message in a message for the broadcasting service. This predicate is hooked by paxos_message_hook/3 with the same signature.
Arguments:
TimeOut- is one of - or a time in seconds.
 1090paxos_message(Paxos:From, TMO, Message) :-
 1091    paxos_message_raw(paxos(Paxos):From, TMO, Message).
 1092paxos_message(Paxos, TMO, Message) :-
 1093    paxos_message_raw(paxos(Paxos), TMO, Message).
 1094
 1095paxos_message_raw(Message, TMO, WireMessage) :-
 1096    paxos_message_hook(Message, TMO, WireMessage),
 1097    !.
 1098paxos_message_raw(Message, TMO, WireMessage) :-
 1099    throw(error(mode_error(det, fail,
 1100                           paxos:paxos_message_hook(Message, TMO, WireMessage)), _)).
 1101
 1102
 1103		 /*******************************
 1104		 *           STORAGE		*
 1105		 *******************************/
 paxos_ledger_hook(+Action, ?Key, ?Gen, ?Value, ?Holders)
Hook called for all operations on the ledger. Defined actions are:
current
Enumerate our ledger content.
get
Get a single value from our ledger.
create
Create a new key in our ledger.
accept
Accept a new newly proposed value for a key. Failure causes the library to send a NACK message.
set
Final acceptance of Ken@Gen, providing the holders that accepted the new value.
learn
Accept new keys in a new node or node that has been offline for some time.
 1127:- dynamic
 1128    paxons_ledger/4.                    % Key, Gen, Value, Holders
 ledger_current(?Key, ?Gen, ?Value, ?Holders) is nondet
True when Key is a known key in my ledger.
 1134ledger_current(Key, Gen, Value, Holders) :-
 1135    paxos_ledger_hook(current, Key, Gen, Value, Holders).
 1136ledger_current(Key, Gen, Value, Holders) :-
 1137    paxons_ledger(Key, Gen, Value, Holders),
 1138    valid(Holders).
 ledger(+Key, -Gen, -Value) is semidet
True if the ledger has Value associated with Key at generation Gen. Note that if the value is not yet acknowledged by the leader we should not use it.
 1147ledger(Key, Gen, Value) :-
 1148    paxos_ledger_hook(get, Key, Gen, Value0, Holders),
 1149    !,
 1150    valid(Holders),
 1151    Value = Value0.
 1152ledger(Key, Gen, Value) :-
 1153    paxons_ledger(Key, Gen, Value0, Holders),
 1154    valid(Holders),
 1155    !,
 1156    Value = Value0.
 ledger_create(+Key, +Gen, +Value) is det
Create a new Key-Value pair at generation Gen. This is executed during the preparation phase.
 1163ledger_create(Key, Gen, Value) :-
 1164    paxos_ledger_hook(create, Key, Gen, Value, -),
 1165    !.
 1166ledger_create(Key, Gen, Value) :-
 1167    get_time(Now),
 1168    asserta(paxons_ledger(Key, Gen, Value, created(Now))).
 ledger_update(+Key, +Gen, +Value) is semidet
Update Key to Value if the current generation is older than Gen. This reflects the accept phase of the protocol.
 1175ledger_update(Key, Gen, Value) :-
 1176    paxos_ledger_hook(accept, Key, Gen, Value, -),
 1177    !.
 1178ledger_update(Key, Gen, Value) :-
 1179    paxons_ledger(Key, Gen0, _Value, _Holders),
 1180    !,
 1181    Gen > Gen0,
 1182    get_time(Now),
 1183    asserta(paxons_ledger(Key, Gen, Value, accepted(Now))),
 1184    (   Gen0 == 0
 1185    ->  retractall(paxons_ledger(Key, Gen0, _, _))
 1186    ;   true
 1187    ).
 ledger_update_holders(+Key, +Gen, +Holders) is det
The leader acknowledged that Key@Gen represents a valid new
 1193ledger_update_holders(Key, Gen, Holders) :-
 1194    paxos_ledger_hook(set, Key, Gen, _, Holders),
 1195    !.
 1196ledger_update_holders(Key, Gen, Holders) :-
 1197    clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref),
 1198    !,
 1199    (   Holders0 == Holders
 1200    ->  true
 1201    ;   asserta(paxons_ledger(Key, Gen, Value, Holders)),
 1202        erase(Ref)
 1203    ),
 1204    clean_key(Holders0, Key, Gen).
 1205
 1206clean_key(Holders, _Key, _Gen) :-
 1207    valid(Holders),
 1208    !.
 1209clean_key(_, Key, Gen) :-
 1210    (   clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref),
 1211        Gen0 < Gen,
 1212        erase(Ref),
 1213        fail
 1214    ;   true
 1215    ).
 ledger_learn(+Key, +Gen, +Value) is semidet
We received a learn event.
 1222ledger_learn(Key,Gen,Value) :-
 1223    paxos_ledger_hook(learn, Key, Gen, Value, -),
 1224    !.
 1225ledger_learn(Key,Gen,Value) :-
 1226    paxons_ledger(Key, Gen0, Value0, _Holders),
 1227    !,
 1228    (   Gen == Gen0,
 1229        Value == Value0
 1230    ->  true
 1231    ;   Gen > Gen0
 1232    ->  get_time(Now),
 1233        asserta(paxons_ledger(Key, Gen, Value, learned(Now)))
 1234    ).
 1235ledger_learn(Key,Gen,Value) :-
 1236    get_time(Now),
 1237    asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
 ledger_forget(+Nodes) is det
Remove Nodes from all ledgers. This is executed in a background thread.
 1244ledger_forget(Nodes) :-
 1245    catch(thread_create(ledger_forget_threaded(Nodes), _,
 1246                        [ detached(true)
 1247                        ]),
 1248          error(permission_error(create, thread, _), _),
 1249          true).
 1250
 1251ledger_forget_threaded(Nodes) :-
 1252    debug(paxos(node), 'Forgetting 0x~16r', [Nodes]),
 1253    forall(ledger_current(Key, Gen, _Value, Holders),
 1254           ledger_forget(Nodes, Key, Gen, Holders)),
 1255    debug(paxos(node), 'Forgotten 0x~16r', [Nodes]).
 1256
 1257ledger_forget(Nodes, Key, Gen, Holders) :-
 1258    NewHolders is Holders /\ \Nodes,
 1259    (   NewHolders \== Holders,
 1260        ledger_update_holders(Key, Gen, NewHolders)
 1261    ->  true
 1262    ;   true
 1263    ).
 1264
 1265valid(Holders) :-
 1266    integer(Holders).
 1267
 1268
 1269		 /*******************************
 1270		 *             UTIL		*
 1271		 *******************************/
 c_element(+NewList, +Old, -Value)
A Muller c-element is a logic block used in asynchronous logic. Its output assumes the value of its input iff all of its inputs are identical. Otherwise, the output retains its original value.
 1279c_element([New | More], _Old, New) :-
 1280    forall(member(N, More), N == New),
 1281    !.
 1282c_element(_List, Old, Old).
 arg_union(+Arg, +ListOfTerms, -Set) is det
Get all the nth args from ListOfTerms and do a set union on the result.
 1289arg_union(Arg, NodeStatusList, Set) :-
 1290    maplist(arg(Arg), NodeStatusList, Sets),
 1291    list_union(Sets, Set).
 1292
 1293list_union(Sets, Set) :-
 1294    list_union(Sets, 0, Set).
 1295
 1296list_union([], Set, Set).
 1297list_union([H|T], Set0, Set) :-
 1298    Set1 is Set0 \/ H,
 1299    list_union(T, Set1, Set)