View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald and Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2012-2013, Jeffrey Rosenwald
    7		   2018-2020, CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(udp_broadcast,
   37          [ udp_broadcast_initialize/2,         % +IPAddress, +Options
   38            udp_broadcast_close/1,		% +Scope
   39
   40            udp_peer_add/2,                     % +Scope, +IP
   41            udp_peer_del/2,                     % +Scope, ?IP
   42            udp_peer/2                          % +Scope, -IP
   43          ]).   44:- autoload(library(apply),[maplist/2,maplist/3]).   45:- autoload(library(backcomp),[thread_at_exit/1]).   46:- autoload(library(broadcast),
   47	    [broadcast_request/1,broadcast/1,listening/3,listen/3]).   48:- use_module(library(debug),[debug/3]).   49:- autoload(library(error),
   50	    [must_be/2,syntax_error/1,domain_error/2,existence_error/2]).   51:- autoload(library(option),[option/3]).   52:- autoload(library(socket),
   53	    [ tcp_close_socket/1,
   54	      udp_socket/1,
   55	      tcp_bind/2,
   56	      tcp_getopt/2,
   57	      tcp_setopt/2,
   58	      udp_receive/4,
   59	      udp_send/4
   60	    ]).   61
   62
   63% :- debug(udp(broadcast)).
   64
   65/** <module> A UDP broadcast proxy
   66
   67SWI-Prolog's broadcast library provides a  means   that  may  be used to
   68facilitate publish and subscribe communication regimes between anonymous
   69members of a community of interest.  The   members  of the community are
   70however, necessarily limited to a  single   instance  of Prolog. The UDP
   71broadcast library removes that restriction.   With  this library loaded,
   72any member on your local IP subnetwork that also has this library loaded
   73may hear and respond to your broadcasts.
   74
   75This library support three styles of networking as described below. Each
   76of these networks have their own   advantages  and disadvantages. Please
   77study the literature to understand the consequences.
   78
   79  $ broadcast :
   80  Broadcast messages are sent to the LAN subnet. The broadcast
   81  implementation uses two UDP ports: a public to address the whole
   82  group and a private one to address a specific node.  Broadcasting
   83  is generally a good choice if the subnet is small and traffic is
   84  low.
   85
   86  $ unicast :
   87  Unicast sends copies of packages to known peers.  Unicast networks
   88  can easily be routed.  The unicast version uses a single UDP port
   89  per node.  Unicast is generally a good choice for a small party,
   90  in particular if the peers are in different networks.
   91
   92  $ multicast :
   93  Multicast is like broadcast, but it can be configured to
   94  work accross networks and may work more efficiently on VLAN networks.
   95  Like the broadcast setup, two UDP ports are used.  Multicasting can
   96  in general deliver the most efficient LAN and WAN networks, but
   97  requires properly configured routing between the peers.
   98
   99After initialization and, in the case   of  a _unicast_ network managing
  100the  set  of  peers,   communication    happens   through   broadcast/1,
  101broadcast_request/1 and listen/1,2,3.
  102
  103A broadcast/1 or broadcast_request/1 of the   shape  udp(Scope, Term) or
  104udp(Scope, Term, TimeOut) is forwarded over the UDP network to all peers
  105that joined the same `Scope`.  To   prevent  the  potential for feedback
  106loops, only the plain `Term`  is   broadcasted  locally.  The timeout is
  107optional. It specifies the amount to time  to wait for replies to arrive
  108in response to a  broadcast_request/1.  The   default  period  is  0.250
  109seconds. The timeout is ignored for broadcasts.
  110
  111An example of three separate processes   cooperating in the same _scope_
  112called `peers`:
  113
  114==
  115Process A:
  116
  117   ?- listen(number(X), between(1, 5, X)).
  118   true.
  119
  120   ?-
  121
  122Process B:
  123
  124   ?- listen(number(X), between(7, 9, X)).
  125   true.
  126
  127   ?-
  128
  129Process C:
  130
  131   ?- findall(X, broadcast_request(udp(peers, number(X))), Xs).
  132   Xs = [1, 2, 3, 4, 5, 7, 8, 9].
  133
  134   ?-
  135==
  136
  137It is also  possible  to  carry  on   a  private  dialog  with  a single
  138responder. To do this, you supply a   compound of the form, Term:PortId,
  139to a UDP scoped broadcast/1 or  broadcast_request/1, where PortId is the
  140ip-address and port-id of  the  intended   listener.  If  you  supply an
  141unbound variable, PortId, to broadcast_request, it  will be unified with
  142the address of the listener  that  responds   to  Term.  You  may send a
  143directed broadcast to a specific member by simply providing this address
  144in a similarly structured compound  to   a  UDP  scoped broadcast/1. The
  145message is sent via unicast to that member   only by way of the member's
  146broadcast listener. It is received by  the   listener  just as any other
  147broadcast would be. The listener does not know the difference.
  148
  149For example, in order to discover who responded with a particular value:
  150
  151==
  152Host B Process 1:
  153
  154   ?- listen(number(X), between(1, 5, X)).
  155   true.
  156
  157   ?-
  158
  159Host A Process 1:
  160
  161
  162   ?- listen(number(X), between(7, 9, X)).
  163   true.
  164
  165   ?-
  166
  167Host A Process 2:
  168
  169   ?- listen(number(X), between(1, 5, X)).
  170   true.
  171
  172   ?- bagof(X, broadcast_request(udp(peers,number(X):From,1)), Xs).
  173   From = ip(192, 168, 1, 103):34855,
  174   Xs = [7, 8, 9] ;
  175   From = ip(192, 168, 1, 103):56331,
  176   Xs = [1, 2, 3, 4, 5] ;
  177   From = ip(192, 168, 1, 104):3217,
  178   Xs = [1, 2, 3, 4, 5].
  179==
  180
  181All incomming trafic is handled  by  a   single  thread  with  the alias
  182`udp_inbound_proxy`. This thread also performs  the internal dispatching
  183using broadcast/1 and broadcast_request/1. Future   versions may provide
  184for handling these requests in separate threads.
  185
  186
  187## Caveats {#udp-broadcase-caveats}
  188
  189While the implementation is mostly transparent, there are some important
  190and subtle differences that must be taken into consideration:
  191
  192    * UDP broadcast requires an initialization step in order to
  193    launch the broadcast listener proxy. See
  194    udp_broadcast_initialize/2.
  195
  196    * Prolog's broadcast_request/1 is nondet. It sends the request,
  197    then evaluates the replies synchronously, backtracking as needed
  198    until a satisfactory reply is received. The remaining potential
  199    replies are not evaluated.  With UDP, all peers will send all
  200    answers to the query.  The receiver may however stop listening.
  201
  202    * A UDP broadcast/1 is completely asynchronous.
  203
  204    * A  UDP broadcast_request/1 is partially synchronous. A
  205    broadcast_request/1 is sent, then the sender balks for a period of
  206    time (default: 250 ms) while the replies are collected. Any reply
  207    that is received after this period is silently discarded. A
  208    optional second argument is provided so that a sender may specify
  209    more (or less) time for replies.
  210
  211    * Replies are presented to the user as a choice point on arrival,
  212    until the broadcast request timer finally expires. This
  213    allows traffic to propagate through the system faster and provides
  214    the requestor with the opportunity to terminate a broadcast request
  215    early if desired, by simply cutting choice points.
  216
  217    * Please beware that broadcast request transactions remain active
  218    and resources consumed until broadcast_request finally fails on
  219    backtracking, an uncaught exception occurs, or until choice points
  220    are cut. Failure to properly manage this will likely result in
  221    chronic exhaustion of UDP sockets.
  222
  223    * If a listener is connected to a generator that always succeeds
  224    (e.g. a random number generator), then the broadcast request will
  225    never terminate and trouble is bound to ensue.
  226
  227    * broadcast_request/1 with =|udp_subnet|= scope is _not_ reentrant.
  228    If a listener performs a broadcast_request/1 with UDP scope
  229    recursively, then disaster looms certain. This caveat does not apply
  230    to a UDP scoped broadcast/1, which can safely be performed from a
  231    listener context.
  232
  233    * UDP broadcast's capacity is not infinite. While it can tolerate
  234    substantial bursts of activity, it is designed for short bursts of
  235    small messages. Unlike TIPC, UDP is unreliable and has no QOS
  236    protections. Congestion is likely to cause trouble in the form of
  237    non-Byzantine failure. That is, late, lost (e.g. infinitely late),
  238    or duplicate datagrams. Caveat emptor.
  239
  240    * A UDP broadcast_request/1 term that is grounded is considered to
  241    be a broadcast only. No replies are collected unless the there is at
  242    least one unbound variable to unify.
  243
  244    * A UDP broadcast/1 always succeeds, even if there are no
  245    listeners.
  246
  247    * A UDP broadcast_request/1 that receives no replies will fail.
  248
  249    * Replies may be coming from many different places in the network
  250    (or none at all). No ordering of replies is implied.
  251
  252    * Prolog terms are sent to others after first converting them to
  253    atoms using term_string/3.  Serialization does not deal with cycles,
  254    attributes or sharing.   The hook udp_term_string_hook/3 may be
  255    defined to change the message serialization and support different
  256    message formats and/or encryption.
  257
  258    * The broadcast model is based on anonymity and a presumption of
  259    trust--a perfect recipe for compromise. UDP is an Internet protocol.
  260    A UDP broadcast listener exposes a public port, which is
  261    static and shared by all listeners, and a private port, which is
  262    semi-static and unique to the listener instance. Both can be seen
  263    from off-cluster nodes and networks. Usage of this module exposes
  264    the node and consequently, the cluster to significant security
  265    risks. So have a care when designing your application. You must talk
  266    only to those who share and contribute to your concerns using a
  267    carefully prescribed protocol.
  268
  269    * UDP broadcast categorically and silently ignores all message
  270    traffic originating from or terminating on nodes that are not
  271    members of the local subnet. This security measure only keeps honest
  272    people honest!
  273
  274@author    Jeffrey Rosenwald (JeffRose@acm.org), Jan Wielemaker
  275@license   BSD-2
  276@see       tipc.pl
  277*/
  278
  279:- multifile
  280    udp_term_string_hook/3,                     % +Scope, ?Term, ?String
  281    udp_unicast_join_hook/3,                    % +Scope, +From, +Data
  282    black_list/1.                               % +Term
  283
  284:- meta_predicate
  285    safely(0),
  286    safely_det(0).  287
  288safely(Predicate) :-
  289    Err = error(_,_),
  290    catch(Predicate, Err,
  291          print_message_fail(Err)).
  292
  293safely_det(Predicate) :-
  294    Err = error(_,_),
  295    catch(Predicate, Err,
  296          print_message_fail(Err)),
  297    !.
  298safely_det(_).
  299
  300print_message_fail(Term) :-
  301    print_message(error, Term),
  302    fail.
  303
  304udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :-
  305    IPAddress = ip(A1, A2, A3, A4),
  306    Subnet = ip(S1, S2, S3, S4),
  307    BroadcastAddress = ip(B1, B2, B3, B4),
  308
  309    B1 is A1 \/ (S1 xor 255),
  310    B2 is A2 \/ (S2 xor 255),
  311    B3 is A3 \/ (S3 xor 255),
  312    B4 is A4 \/ (S4 xor 255).
  313
  314%!  udp_broadcast_service(?Scope, ?Address) is nondet.
  315%
  316%   provides the UDP broadcast address for   a  given Scope. At present,
  317%   only one scope is supported, =|udp_subnet|=.
  318
  319%!  udp_scope(?ScopeName, ?ScopeDef)
  320
  321:- dynamic
  322    udp_scope/2,
  323    udp_scope_peer/2.  324:- volatile
  325    udp_scope/2,
  326    udp_scope_peer/2.  327%
  328%  Here's a UDP proxy to Prolog's broadcast library
  329%
  330%  A sender may extend a broadcast  to  a   subnet  of  a UDP network by
  331%  specifying a =|udp_subnet|= scoping qualifier   in his/her broadcast.
  332%  The qualifier has the effect of  selecting the appropriate multi-cast
  333%  address for the transmission. Thus,  the   sender  of the message has
  334%  control over the scope of his/her traffic on a per-message basis.
  335%
  336%  All in-scope listeners receive the   broadcast and simply rebroadcast
  337%  the message locally. All broadcast replies, if any, are sent directly
  338%  to the sender via the port-id that   was received with the broadcast.
  339%
  340%  Each listener exposes two UDP ports,  a   shared  public port that is
  341%  bound to a well-known port number and   a  private port that uniquely
  342%  indentifies the listener. Broadcasts are received  on the public port
  343%  and replies are  sent  on  the   private  port.  Directed  broadcasts
  344%  (unicasts) are received on the private port   and replies are sent on
  345%  the private port.
  346
  347%  Thread 1 listens for directed traffic on the private port.
  348%
  349
  350:- dynamic
  351    udp_private_socket/3,                       % Port, Socket, FileNo
  352    udp_public_socket/4,                        % Scope, Port, Socket, FileNo
  353    udp_closed/1.				% Scope
  354
  355udp_inbound_proxy(Master) :-
  356    thread_at_exit(inbound_proxy_died),
  357    make_private_socket,
  358    thread_send_message(Master, udp_inbound_ready),
  359    udp_inbound_proxy_loop.
  360
  361udp_inbound_proxy_loop :-
  362    forall(udp_scope(Scope, ScopeData),
  363           make_public_socket(ScopeData, Scope)),
  364    retractall(udp_closed(_)),
  365    findall(FileNo, udp_socket_file_no(FileNo), FileNos),
  366    catch(dispatch_inbound(FileNos),
  367          E, dispatch_exception(E)),
  368    udp_inbound_proxy_loop.
  369
  370dispatch_exception(E) :-
  371    E = error(_,_),
  372    !,
  373    print_message(warning, E).
  374dispatch_exception(_).
  375
  376
  377%!  make_private_socket is det.
  378%
  379%   Create our private socket. This socket is used for messages that are
  380%   directed to me. Note that we only  need this for broadcast networks.
  381%   If we use a unicast network we use   our public port to contact this
  382%   specific server.
  383
  384make_private_socket :-
  385    udp_private_socket(_Port, S, _F),
  386    !,
  387    (   (   udp_scope(Scope, broadcast(_,_,_))
  388        ;   udp_scope(Scope, multicast(_,_))
  389        ),
  390        \+ udp_closed(Scope)
  391    ->  true
  392    ;   tcp_close_socket(S),
  393        retractall(udp_private_socket(_,_,_))
  394    ).
  395make_private_socket :-
  396    udp_scope(_, broadcast(_,_,_)),
  397    !,
  398    udp_socket(S),
  399    tcp_bind(S, Port),
  400    tcp_getopt(S, file_no(F)),
  401    tcp_setopt(S, broadcast),
  402    assertz(udp_private_socket(Port, S, F)).
  403make_private_socket :-
  404    udp_scope(_, multicast(_,_)),
  405    !,
  406    udp_socket(S),
  407    tcp_bind(S, Port),
  408    tcp_getopt(S, file_no(F)),
  409    assertz(udp_private_socket(Port, S, F)).
  410make_private_socket.
  411
  412%!  make_public_socket(+ScopeData, +Scope)
  413%
  414%   Create the public port Scope.
  415
  416make_public_socket(_, Scope) :-
  417    udp_public_socket(Scope, _Port, S, _),
  418    !,
  419    (   udp_closed(Scope)
  420    ->  tcp_close_socket(S),
  421        retractall(udp_public_socket(Scope, _, _, _))
  422    ;   true
  423    ).
  424make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :-
  425    udp_socket(S),
  426    tcp_setopt(S, reuseaddr),
  427    tcp_bind(S, Port),
  428    tcp_getopt(S, file_no(F)),
  429    assertz(udp_public_socket(Scope, Port, S, F)).
  430make_public_socket(multicast(Group, Port), Scope) :-
  431    udp_socket(S),
  432    tcp_setopt(S, reuseaddr),
  433    tcp_bind(S, Port),
  434    tcp_setopt(S, ip_add_membership(Group)),
  435    tcp_getopt(S, file_no(F)),
  436    assertz(udp_public_socket(Scope, Port, S, F)).
  437make_public_socket(unicast(Port), Scope) :-
  438    udp_socket(S),
  439    tcp_bind(S, Port),
  440    tcp_getopt(S, file_no(F)),
  441    assertz(udp_public_socket(Scope, Port, S, F)).
  442
  443udp_socket_file_no(FileNo) :-
  444    udp_private_socket(_,_,FileNo).
  445udp_socket_file_no(FileNo) :-
  446    udp_public_socket(_,_,_,FileNo).
  447
  448%!  dispatch_inbound(+FileNos)
  449%
  450%   Dispatch inbound traffic. This loop   uses  wait_for_input/3 to wait
  451%   for one or more UDP sockets and   dispatches  the requests using the
  452%   internal broadcast service. For an  incomming broadcast _request_ we
  453%   send the reply only to the  requester   and  therefore we must use a
  454%   socket that is not in broadcast mode.
  455
  456dispatch_inbound(FileNos) :-
  457    debug(udp(broadcast), 'Waiting for ~p', [FileNos]),
  458    wait_for_input(FileNos, Ready, infinite),
  459    debug(udp(broadcast), 'Ready: ~p', [Ready]),
  460    maplist(dispatch_ready, Ready),
  461    dispatch_inbound(FileNos).
  462
  463dispatch_ready(FileNo) :-
  464    udp_private_socket(_Port, Private, FileNo),
  465    !,
  466    udp_receive(Private, Data, From, [max_message_size(65535)]),
  467    debug(udp(broadcast), 'Inbound on private port', []),
  468    (   in_scope(Scope, From),
  469        udp_term_string(Scope, Term, Data) % only accept valid data
  470    ->  ld_dispatch(Private, Term, From, Scope)
  471    ;   true
  472    ).
  473dispatch_ready(FileNo) :-
  474    udp_public_socket(Scope, _PublicPort, Public, FileNo),
  475    !,
  476    udp_receive(Public, Data, From, [max_message_size(65535)]),
  477    debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p',
  478          [From, Scope]),
  479    (   in_scope(Scope, From),
  480        udp_term_string(Scope, Term, Data) % only accept valid data
  481    ->  (   udp_scope(Scope, unicast(_))
  482        ->  ld_dispatch(Public, Term, From, Scope)
  483        ;   udp_private_socket(_PrivatePort, Private, _FileNo),
  484            ld_dispatch(Private, Term, From, Scope)
  485        )
  486    ;   udp_scope(Scope, unicast(_)),
  487        udp_term_string(Scope, Term, Data),
  488        unicast_out_of_scope_request(Scope, From, Term)
  489    ->  true
  490    ;   true
  491    ).
  492
  493in_scope(Scope, Address) :-
  494    udp_scope(Scope, ScopeData),
  495    in_scope(ScopeData, Scope, Address),
  496    !.
  497in_scope(Scope, From) :-
  498    debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p',
  499          [Scope, From]),
  500    fail.
  501
  502in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :-
  503    udp_broadcast_address(IP, Subnet, Broadcast).
  504in_scope(multicast(_Group, _Port), _Scope, _From).
  505in_scope(unicast(_PublicPort), Scope, IP:_) :-
  506    udp_peer(Scope, IP:_).
  507
  508
  509%!  ld_dispatch(+PrivateSocket, +Term, +From, +Scope)
  510%
  511%   Locally dispatch Term received from From. If it concerns a broadcast
  512%   request, send the replies to PrivateSocket   to  From. The multifile
  513%   hook black_list/1 can be used to ignore certain messages.
  514
  515ld_dispatch(_S, Term, From, _Scope) :-
  516    debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]),
  517    fail.
  518ld_dispatch(_S, Term, _From, _Scope) :-
  519    blacklisted(Term), !.
  520ld_dispatch(S, request(Key, Term), From, Scope) :-
  521    !,
  522    forall(safely(broadcast_request(Term)),
  523           safely((udp_term_string(Scope, reply(Key,Term), Message),
  524                   udp_send(S, Message, From, [])))).
  525ld_dispatch(_S, send(Term), _From, _Scope) :-
  526    !,
  527    safely_det(broadcast(Term)).
  528ld_dispatch(_S, reply(Key, Term), From, _Scope) :-
  529    (   reply_queue(Key, Queue)
  530    ->  safely(thread_send_message(Queue, Term:From))
  531    ;   true
  532    ).
  533
  534blacklisted(send(Term))      :- black_list(Term).
  535blacklisted(request(_,Term)) :- black_list(Term).
  536blacklisted(reply(_,Term))   :- black_list(Term).
  537
  538
  539%!  reload_udp_proxy
  540%
  541%   Update the UDP relaying proxy service.   The proxy consists of three
  542%   forwarding mechanisms:
  543%
  544%     - Listen on our _scope_.  If any messages are received, hand them
  545%       to udp_broadcast/3 to be broadcasted to _scope_ or sent to a
  546%       specific recipient.
  547%     - Listen on the _scope_ public port. Incomming messages are
  548%       relayed to the internal broadcast mechanism and replies are sent
  549%       to from our private socket.
  550%     - Listen on our private port and reply using the same port.
  551
  552reload_udp_proxy :-
  553    reload_outbound_proxy,
  554    reload_inbound_proxy.
  555
  556reload_outbound_proxy :-
  557    listening(udp_broadcast, udp(_,_), _),
  558    !.
  559reload_outbound_proxy :-
  560    listen(udp_broadcast, udp(Scope,Message),
  561           udp_broadcast(Message, Scope, 0.25)),
  562    listen(udp_broadcast, udp(Scope,Message,Timeout),
  563           udp_broadcast(Message, Scope, Timeout)),
  564    listen(udp_broadcast, udp_subnet(Message),  % backward compatibility
  565           udp_broadcast(Message, subnet, 0.25)),
  566    listen(udp_broadcast, udp_subnet(Message,Timeout),
  567           udp_broadcast(Message, subnet, Timeout)).
  568
  569reload_inbound_proxy :-
  570    catch(thread_signal(udp_inbound_proxy, throw(udp_reload)),
  571          error(existence_error(thread, _),_),
  572          fail),
  573    !.
  574reload_inbound_proxy :-
  575    thread_self(Me),
  576    thread_create(udp_inbound_proxy(Me), _,
  577                  [ alias(udp_inbound_proxy),
  578                    detached(true)
  579                  ]),
  580    thread_get_message(Me, udp_inbound_ready, [timeout(10)]).
  581
  582inbound_proxy_died :-
  583    thread_self(Self),
  584    thread_property(Self, status(Status)),
  585    (   catch(recreate_proxy(Status), _, fail)
  586    ->  print_message(informational,
  587                      httpd_restarted_worker(Self))
  588    ;   done_status_message_level(Status, Level),
  589        print_message(Level,
  590                      httpd_stopped_worker(Self, Status))
  591    ).
  592
  593recreate_proxy(exception(Error)) :-
  594    recreate_on_error(Error),
  595    reload_inbound_proxy.
  596
  597recreate_on_error('$aborted').
  598recreate_on_error(time_limit_exceeded).
  599
  600done_status_message_level(true, silent) :- !.
  601done_status_message_level(exception('$aborted'), silent) :- !.
  602done_status_message_level(_, informational).
  603
  604
  605%!  udp_broadcast_close(+Scope)
  606%
  607%   Close a UDP broadcast scope.
  608
  609udp_broadcast_close(Scope) :-
  610    udp_scope(Scope, _ScopeData),
  611    !,
  612    assert(udp_closed(Scope)),
  613    reload_udp_proxy.
  614udp_broadcast_close(_).
  615
  616
  617%!  udp_broadcast(+What, +Scope, +TimeOut)
  618%
  619%   Send a broadcast request to my UDP peers in Scope. What is either of
  620%   the shape `Term:Address` to send Term to a specific address or query
  621%   the address from which term is answered or it is a plain `Term`.
  622%
  623%   If `Term` is  nonground,  it  is   considered  is  a  _request_ (see
  624%   broadcast_request/1) and the predicate  succeeds   for  each  answer
  625%   received within TimeOut seconds. If Term is ground it is considered
  626%   an asynchronous broadcast and udp_broadcast/3 is deterministic.
  627
  628udp_broadcast(Term:To, Scope, _Timeout) :-
  629    ground(Term), ground(To),           % broadcast to single listener
  630    !,
  631    udp_basic_broadcast(send(Term), Scope, single(To)).
  632udp_broadcast(Term, Scope, _Timeout) :-
  633    ground(Term),                       % broadcast to all listeners
  634    !,
  635    udp_basic_broadcast(send(Term), Scope, broadcast).
  636udp_broadcast(Term:To, Scope, Timeout) :-
  637    ground(To),                         % request to single listener
  638    !,
  639    setup_call_cleanup(
  640        request_queue(Id, Queue),
  641        ( udp_basic_broadcast(request(Id, Term), Scope, single(To)),
  642          udp_br_collect_replies(Queue, Timeout, Term:To)
  643        ),
  644        destroy_request_queue(Queue)).
  645udp_broadcast(Term:From, Scope, Timeout) :-
  646    !,                                  % request to all listeners, collect sender
  647    setup_call_cleanup(
  648        request_queue(Id, Queue),
  649        ( udp_basic_broadcast(request(Id, Term), Scope, broadcast),
  650          udp_br_collect_replies(Queue, Timeout, Term:From)
  651        ),
  652        destroy_request_queue(Queue)).
  653udp_broadcast(Term, Scope, Timeout) :-  % request to all listeners
  654    udp_broadcast(Term:_, Scope, Timeout).
  655
  656:- dynamic
  657    reply_queue/2.  658
  659request_queue(Id, Queue) :-
  660    Id is random(1<<63),
  661    message_queue_create(Queue),
  662    asserta(reply_queue(Id, Queue)).
  663
  664destroy_request_queue(Queue) :-         % leave queue to GC
  665    retractall(reply_queue(_, Queue)).
  666
  667
  668%!  udp_basic_broadcast(+Term, +Dest) is multi.
  669%
  670%   Create a UDP private socket and use it   to send Term to Address. If
  671%   Address is our broadcast address, set the socket in broadcast mode.
  672%
  673%   This predicate succeeds with a choice   point. Committing the choice
  674%   point closes S.
  675%
  676%   @arg Dest is one of single(Target) or `broadcast`.
  677
  678udp_basic_broadcast(Term, Scope, Dest) :-
  679    debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]),
  680    udp_term_string(Scope, Term, String),
  681    udp_send_message(Dest, String, Scope).
  682
  683udp_send_message(single(Address), String, Scope) :-
  684    (   udp_scope(Scope, unicast(_))
  685    ->  udp_public_socket(Scope, _Port, S, _)
  686    ;   udp_private_socket(_Port, S, _F)
  687    ),
  688    safely(udp_send(S, String, Address, [])).
  689udp_send_message(broadcast, String, Scope) :-
  690    (   udp_scope(Scope, unicast(_))
  691    ->  udp_public_socket(Scope, _Port, S, _),
  692        forall(udp_peer(Scope, Address),
  693               ( debug(udp(broadcast), 'Unicast to ~p', [Address]),
  694                 safely(udp_send(S, String, Address, []))))
  695    ;   udp_scope(Scope, broadcast(_SubNet, Broadcast, Port))
  696    ->  udp_private_socket(_PrivatePort, S, _F),
  697        udp_send(S, String, Broadcast:Port, [])
  698    ;   udp_scope(Scope, multicast(Group, Port))
  699    ->  udp_private_socket(_PrivatePort, S, _F),
  700        udp_send(S, String, Group:Port, [])
  701    ).
  702
  703% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet.
  704%
  705%   Collect replies on Socket for  TimeOut   seconds.  Succeed  for each
  706%   received message.
  707
  708udp_br_collect_replies(Queue, Timeout, Reply) :-
  709    get_time(Start),
  710    Deadline is Start+Timeout,
  711    repeat,
  712       (   thread_get_message(Queue, Reply,
  713                              [ deadline(Deadline)
  714                              ])
  715       ->  true
  716       ;   !,
  717           fail
  718       ).
  719
  720%!  udp_broadcast_initialize(+IPAddress, +Options) is semidet.
  721%
  722%   Initialized UDP broadcast bridge. IPAddress is the IP address on the
  723%   network we want to broadcast on.  IP addresses are terms ip(A,B,C,D)
  724%   or an atom or string of the format =|A.B.C.D|=.   Options processed:
  725%
  726%     - scope(+ScopeName)
  727%     Name of the scope.  Default is `subnet`.
  728%     - subnet_mask(+SubNet)
  729%     Subnet to broadcast on.  This uses the same syntax as IPAddress.
  730%     Default classifies the network as class A, B or C depending on
  731%     the the first octet and applies the default mask.
  732%     - port(+Port)
  733%     Public port to use.  Default is 20005.
  734%     - method(+Method)
  735%     Method to send a message to multiple peers.  One of
  736%       - broadcast
  737%       Use UDP broadcast messages to the LAN.  This is the
  738%       default
  739%       - multicast
  740%       Use UDP multicast messages.  This can be used on WAN networks,
  741%       provided the intermediate routers understand multicast.
  742%       - unicast
  743%       Send the messages individually to all registered peers.
  744%
  745%   For compatibility reasons Options may be the subnet mask.
  746
  747udp_broadcast_initialize(IP, Options) :-
  748    with_mutex(udp_broadcast,
  749               udp_broadcast_initialize_sync(IP, Options)).
  750
  751udp_broadcast_initialize_sync(IP, Options) :-
  752    nonvar(Options),
  753    Options = ip(_,_,_,_),
  754    !,
  755    udp_broadcast_initialize(IP, [subnet_mask(Options)]).
  756udp_broadcast_initialize_sync(IP, Options) :-
  757    to_ip4(IP, IPAddress),
  758    option(method(Method), Options, broadcast),
  759    must_be(oneof([broadcast, multicast, unicast]), Method),
  760    udp_broadcast_initialize_sync(Method, IPAddress, Options),
  761    reload_udp_proxy.
  762
  763udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :-
  764    option(subnet_mask(Subnet), Options, _),
  765    mk_subnet(Subnet, IPAddress, Subnet4),
  766    option(port(Port), Options, 20005),
  767    option(scope(Scope), Options, subnet),
  768
  769    udp_broadcast_address(IPAddress, Subnet4, Broadcast),
  770    udp_broadcast_close(Scope),
  771    assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))).
  772udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :-
  773    option(port(Port), Options, 20005),
  774    option(scope(Scope), Options, subnet),
  775    udp_broadcast_close(Scope),
  776    assertz(udp_scope(Scope, unicast(Port))).
  777udp_broadcast_initialize_sync(multicast, IPAddress, Options) :-
  778    option(port(Port), Options, 20005),
  779    option(scope(Scope), Options, subnet),
  780    udp_broadcast_close(Scope),
  781    multicast_address(IPAddress),
  782    assertz(udp_scope(Scope, multicast(IPAddress, Port))).
  783
  784to_ip4(Atomic, ip(A,B,C,D)) :-
  785    atomic(Atomic),
  786    !,
  787    (   split_string(Atomic, ".", "", Strings),
  788        maplist(number_string, [A,B,C,D], Strings)
  789    ->  true
  790    ;   syntax_error(illegal_ip_address)
  791    ).
  792to_ip4(IP, IP).
  793
  794mk_subnet(Var, IP, Subnet) :-
  795    var(Var),
  796    !,
  797    (   default_subnet(IP, Subnet)
  798    ->  true
  799    ;   domain_error(ip_with_subnet, IP)
  800    ).
  801mk_subnet(Subnet, _, Subnet4) :-
  802    to_ip4(Subnet, Subnet4).
  803
  804%!  default_subnet(+IP, -NetWork)
  805%
  806%   Determine the default network address from an IP address. This
  807%   classifies the network as class A, B or C.
  808%
  809%   @see https://docs.oracle.com/cd/E19504-01/802-5753/planning3-78185/index.html
  810
  811default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :-
  812    between(0,127, A), !.
  813default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :-
  814    between(128,191, A), !.
  815default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :-
  816    between(192,223, A), !.
  817
  818multicast_address(ip(A,_,_,_)) :-
  819    between(224, 239, A),
  820    !.
  821multicast_address(IP) :-
  822    domain_error(multicast_network, IP).
  823
  824
  825		 /*******************************
  826		 *          UNICAST PEERS	*
  827		 *******************************/
  828
  829%!  udp_peer_add(+Scope, +Address) is det.
  830%!  udp_peer_del(+Scope, ?Address) is det.
  831%!  udp_peer(?Scope, ?Address) is nondet.
  832%
  833%   Manage and query the set  of  known   peers  for  a unicast network.
  834%   Address is either a term  IP:Port  or   a  plain  IP address. In the
  835%   latter case the default port registered with the scope is used.
  836%
  837%   @arg Address has canonical form ip(A,B,C,D):Port.
  838
  839udp_peer_add(Scope, Address) :-
  840    must_be(ground, Address),
  841    peer_address(Address, Scope, Canonical),
  842    (   udp_scope_peer(Scope, Canonical)
  843    ->  true
  844    ;   assertz(udp_scope_peer(Scope, Canonical))
  845    ).
  846
  847udp_peer_del(Scope, Address) :-
  848    peer_address(Address, Scope, Canonical),
  849    retractall(udp_scope_peer(Scope, Canonical)).
  850
  851udp_peer(Scope, IPAddress) :-
  852    udp_scope_peer(Scope, IPAddress).
  853
  854peer_address(IP:Port, _Scope, IPAddress:Port) :-
  855    !,
  856    to_ip4(IP, IPAddress).
  857peer_address(IP, Scope, IPAddress:Port) :-
  858    (   udp_scope(Scope, unicast(Port))
  859    ->  true
  860    ;   existence_error(udp_scope, Scope)
  861    ),
  862    to_ip4(IP, IPAddress).
  863
  864
  865
  866		 /*******************************
  867		 *             HOOKS		*
  868		 *******************************/
  869
  870%!  udp_term_string_hook(+Scope, +Term, -String) is det.
  871%!  udp_term_string_hook(+Scope, -Term, +String) is semidet.
  872%
  873%   Hook  for  serializing  the  message    Term.   The  default  writes
  874%   =|%prolog\n|=, followed by the Prolog term  in quoted notation while
  875%   ignoring operators. This hook may use alternative serialization such
  876%   as fast_term_serialized/2, use  library(ssl)   to  realise encrypted
  877%   messages, etc.
  878%
  879%   @arg Scope is the scope for which the message is broadcasted.  This
  880%   can be used to use different serialization for different scopes.
  881%   @arg Term encapsulates the term broadcasted by the application as
  882%   follows:
  883%
  884%     - send(ApplTerm)
  885%       Is sent by broadcast(udp(Scope, ApplTerm))
  886%     - request(Id,ApplTerm)
  887%       Is sent by broadcast_request/1, where Id is a unique large
  888%       (64 bit) integer.
  889%     - reply(Id,ApplTerm)
  890%       Is sent to reply on a broadcast_request/1 request that has
  891%       been received.  Arguments are the same as above.
  892%
  893%   @throws The hook may throw udp(invalid_message) to stop processing
  894%   the message.
  895
  896%!  udp_term_string(+Scope, +Term, -String) is det.
  897%!  udp_term_string(+Scope, -Term, +String) is semidet.
  898%
  899%   Serialize an arbitrary Prolog  term  as   a  string.  The  string is
  900%   prefixed by a magic key to ensure   we only accept messages that are
  901%   meant for us.
  902%
  903%   In mode (+,-), Term is written with the options ignore_ops(true) and
  904%   quoted(true).
  905%
  906%   This predicate first calls  udp_term_string_hook/3.
  907
  908udp_term_string(Scope, Term, String) :-
  909    catch(udp_term_string_hook(Scope, Term, String), udp(Error), true),
  910    !,
  911    (   var(Error)
  912    ->  true
  913    ;   Error == invalid_message
  914    ->  fail
  915    ;   throw(udp(Error))
  916    ).
  917udp_term_string(_Scope, Term, String) :-
  918    (   var(String)
  919    ->  format(string(String), '%-prolog-\n~W',
  920               [ Term,
  921                 [ ignore_ops(true),
  922                   quoted(true)
  923                 ]
  924               ])
  925    ;   sub_string(String, 0, _, _, '%-prolog-\n'),
  926        term_string(Term, String,
  927                    [ syntax_errors(quiet)
  928                    ])
  929    ).
  930
  931%!  unicast_out_of_scope_request(+Scope, +From, +Data) is semidet.
  932
  933%!  udp_unicast_join_hook(+Scope, +From, +Data) is semidet.
  934%
  935%   This multifile hook is called if an   UDP package is received on the
  936%   port of the unicast network identified by  Scope. From is the origin
  937%   IP and port and Data is  the   message  data that is deserialized as
  938%   defined for the scope (see udp_term_string/3).
  939%
  940%   This hook is intended to initiate a  new node joining the network of
  941%   peers. We could in theory also  omit   the  in-scope  test and use a
  942%   normal broadcast to join. Using a different channal however provides
  943%   a basic level of security. A   possibe  implementation is below. The
  944%   first fragment is a hook  added  to   the  server,  the  second is a
  945%   predicate added to a client and the   last  initiates the request in
  946%   the client. The excanged term (join(X)) can   be  used to exchange a
  947%   welcome handshake.
  948%
  949%
  950%   ```
  951%   :- multifile udp_broadcast:udp_unicast_join_hook/3.
  952%   udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :-
  953%       udp_peer_add(Scope, From),
  954%   ```
  955%
  956%   ```
  957%   join_request(Scope, Address, Reply) :-
  958%       udp_peer_add(Scope, Address),
  959%       broadcast_request(udp(Scope, join(X))).
  960%   ```
  961%
  962%   ```
  963%   ?- join_request(myscope, "1.2.3.4":10001, Reply).
  964%   Reply = welcome.
  965%   ```
  966
  967unicast_out_of_scope_request(Scope, From, send(Term)) :-
  968    udp_unicast_join_hook(Scope, From, Term).
  969unicast_out_of_scope_request(Scope, From, request(Key, Term)) :-
  970    udp_unicast_join_hook(Scope, From, Term),
  971    udp_public_socket(Scope, _Port, Socket, _FileNo),
  972    safely((udp_term_string(Scope, reply(Key,Term), Message),
  973            udp_send(Socket, Message, From, [])))