1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2009-2013, Jeffrey Rosenwald 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(tipc_broadcast, 36 [ tipc_host_to_address/2, % ?Host, ?Address 37 tipc_initialize/0 38 ]). 39:- use_module(library(tipc/tipc),[tipc_initialize/0]). 40 41/** <module> A TIPC Broadcast Bridge 42 43SWI-Prolog's broadcast library provides a means that may be used to 44facilitate publish and subscribe communication regimes between anonymous 45members of a community of interest. The members of the community are 46however, necessarily limited to a single instance of Prolog. The TIPC 47broadcast library removes that restriction. With this library loaded, 48any member of a TIPC network that also has this library loaded may hear 49and respond to your broadcasts. Using TIPC Broadcast, it becomes a 50nearly trivial matter to build an instance of supercomputer that 51researchers within the High Performance Computer community refer to as 52"Beowulf Class Cluster Computers." 53 54This module has no public predicates. When this module is initialized, 55it does three things: 56 57 * It starts a listener daemon thread that listens for 58 broadcasts from others, received as TIPC datagrams, and 59 60 * It registers three listeners: tipc_node/1, tipc_cluster/1, and 61 tipc_zone/1, and 62 63 * It registers three listeners: tipc_node/2, tipc_cluster/2, and 64 tipc_zone/2. 65 66A broadcast/1 or broadcast_request/1 that is not directed to one of the 67six listeners above, behaves as usual and is confined to the instance of 68Prolog that originated it. But when so directed, the broadcast will be 69sent to all participating systems, including itself, by way of TIPC's 70multicast addressing facility. A TIPC broadcast or broadcast 71request takes the typical form: =|broadcast(tipc_node(+Term, 72+Timeout))|=. The principal functors =tipc_node=, =tipc_cluster=, and 73=tipc_zone=, specify the scope of the broadcast. The functor 74=tipc_node=, specifies that the broadcast is to be confined to members 75of a present TIPC node. Likewise, =tipc_cluster= and =tipc_zone=, 76specify that the traffic should be confined to members of a present TIPC 77cluster and zone, respectively. To prevent the potential for feedback 78loops, the scope qualifier is stripped from the message before 79transmission. The timeout is optional. It specifies the amount to time 80to wait for replies to arrive in response to a broadcast_request. The 81default period is 0.250 seconds. The timeout is ignored for broadcasts. 82 83An example of three separate processes cooperating on the same Node: 84 85== 86Process A: 87 88 ?- listen(number(X), between(1, 5, X)). 89 true. 90 91 ?- 92 93Process B: 94 95 ?- listen(number(X), between(7, 9, X)). 96 true. 97 98 ?- 99 100Process C: 101 102 ?- findall(X, broadcast_request(tipc_node(number(X))), Xs). 103 Xs = [1, 2, 3, 4, 5, 7, 8, 9]. 104 105 ?- 106== 107 108It is also possible to carry on a private dialog with a single 109responder. To do this, you supply a compound of the form, Term:PortId, 110to a TIPC scoped broadcast/1 or broadcast_request/1, where PortId is the 111port-id of the intended listener. If you supply an unbound variable, 112PortId, to broadcast_request, it will be unified with the address of the 113listener that responds to Term. You may send a directed broadcast to a 114specific member by simply providing this address in a similarly 115structured compound to a TIPC scoped broadcast/1. The message is sent 116via unicast to that member only by way of the member's broadcast 117listener. It is received by the listener just as any other broadcast 118would be. The listener does not know the difference. 119 120Although this capability is needed under some circumstances, it has a 121tendency to compromise the resilience of the broadcast model. You should 122not rely on it too heavily, or fault tolerance will suffer. 123 124For example, in order to discover who responded with a particular value: 125 126== 127Process A: 128 129 ?- listen(number(X), between(1, 3, X)). 130 true. 131 132 ?- 133 134Process B: 135 136 ?- listen(number(X), between(7, 9, X)). 137 true. 138 139 ?- 140 141Process C: 142 143 ?- broadcast_request(tipc_node(number(X):From)). 144 X = 7, 145 From = port_id('<1.1.1:3971170279>') ; 146 X = 8, 147 From = port_id('<1.1.1:3971170279>') ; 148 X = 9, 149 From = port_id('<1.1.1:3971170279>') ; 150 X = 1, 151 From = port_id('<1.1.1:3971170280>') ; 152 X = 2, 153 From = port_id('<1.1.1:3971170280>') ; 154 X = 3, 155 From = port_id('<1.1.1:3971170280>') ; 156 false. 157 158?- 159 160== 161 162## Caveats {#tipc-caveats} 163 164While the implementation is mostly transparent, there are some important 165and subtle differences that must be taken into consideration: 166 167 * TIPC broadcast now requires an initialization step in order to 168 launch the broadcast listener daemon. See tipc_initialize/0. 169 170 * Prolog's broadcast_request/1 is nondet. It sends the request, 171 then evaluates the replies synchronously, backtracking as needed 172 until a satisfactory reply is received. The remaining potential 173 replies are not evaluated. This is not so when TIPC is involved. 174 175 * A TIPC broadcast/1 is completely asynchronous. 176 177 * A TIPC broadcast_request/1 is partially synchronous. A 178 broadcast_request/1 is sent, then the sender balks for a period of 179 time (default: 250 ms) while the replies are collected. Any reply 180 that is received after this period is silently discarded. An 181 optional second argument is provided so that a sender may specify 182 more (or less) time for replies. 183 184 * Replies are _|no longer|_ collected using findall/3. Replies are 185 presented to the user as a choice point on arrival, until the 186 broadcast request timer finally expires. This change allows 187 traffic to propagate through the system faster and provides the 188 requestor with the opportunity to terminate a broadcast request 189 early if desired, by simply cutting choice points. 190 191 * Please beware that broadcast request transactions will now remain 192 active and resources consumed until broadcast_request finally fails 193 on backtracking, an uncaught exception occurs, or until choice 194 points are cut. Failure to properly manage this will likely result 195 in chronic exhaustion of TIPC sockets. 196 197 * If a listener is connected to a generator that always succeeds 198 (e.g. a random number generator), then the broadcast request will 199 never terminate and trouble is bound to ensue. 200 201 * broadcast_request/1 with TIPC scope is _not_ reentrant (at 202 least, not now anyway). If a listener performs a broadcast_request/1 203 with TIPC scope recursively, then disaster looms certain. This 204 caveat does not apply to a TIPC scoped broadcast/1, which can safely 205 be performed from a listener context. 206 207 * TIPC's capacity is not infinite. While TIPC can tolerate 208 substantial bursts of activity, it is designed for short bursts of 209 small messages. It can tolerate several thousand replies in response 210 to a broadcast_request/1 without trouble, but it will begin to 211 encounter congestion beyond that. And in congested conditions, 212 things will start to become unreliable as TIPC begins prioritizing 213 and/or discarding traffic. 214 215 * A TIPC broadcast_request/1 term that is grounded is considered to 216 be a broadcast only. No replies are collected unless the there is at 217 least one unbound variable to unify. 218 219 * A TIPC broadcast/1 always succeeds, even if there are no 220 listeners. 221 222 * A TIPC broadcast_request/1 that receives no replies will fail. 223 224 * Replies may be coming from many different places in the network 225 (or none at all). No ordering of replies is implied. 226 227 * Prolog terms are sent to others after first converting them to 228 atoms using term_to_atom/2. Passing real numbers this way may 229 result in a substantial truncation of precision. See prolog flag 230 option, 'float_format', of current_prolog_flag/2. 231 232@author Jeffrey Rosenwald (JeffRose@acm.org) 233@license LGPL 234@see tipc.pl 235@compat Linux only 236*/ 237 238:- autoload(tipc, 239 [ tipc_get_name/2, 240 tipc_send/4, 241 tipc_socket/2, 242 tipc_close_socket/1, 243 tipc_setopt/2, 244 tipc_bind/3, 245 tipc_receive/4 246 ]). 247:- autoload(library(broadcast), 248 [broadcast_request/1,broadcast/1,listen/3,unlisten/1]). 249:- use_module(library(debug),[assertion/1]). 250:- autoload(library(time), 251 [call_with_time_limit/2,alarm/3,remove_alarm/1]). 252 253:- require([ thread_self/1 254 , forall/2 255 , term_to_atom/2 256 , thread_send_message/2 257 , catch/3 258 , setup_call_cleanup/3 259 , thread_create/3 260 ]). 261 262:- meta_predicate safely( ), eventually_implies( , ), ~>( , ). 263 264tipc_broadcast_service(node, name_seq(20005, 0, 0)). 265tipc_broadcast_service(cluster, name_seq(20005, 1, 1)). 266tipc_broadcast_service(zone, name_seq(20005, 2, 2)). 267 268% 269% Here's a TIPC bridge to Prolog's broadcast library 270% 271% A sender may confine a broadcast to a subset of a TIPC network by 272% specifying a scoping qualifier in his/her broadcast. The qualifier 273% has the effect of selecting the appropriate multi-cast address for 274% the transmission. Thus, the sender of the message has control over 275% the scope of his/her traffic on a per-message basis. 276% 277% All in-scope listeners receive the broadcast and simply rebroadcast 278% the message locally. All broadcast replies, if any, are sent directly 279% to the sender via the port-id that was received with the broadcast. 280% No additional multiplexing is required. 281% 282 283safely(Predicate) :- 284 catch(Predicate, Err, 285 (Err == '$aborted' -> (!, fail); 286 print_message(error, Err), fail)). 287 288%! ~>(:P, :Q) is semidet. 289%! eventually_implies(:P, :Q) is semidet. 290% asserts temporal Liveness (something good happens, eventually) and 291% Safety (nothing bad ever happens) properties. Analogous to the 292% "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of 293% lazy implication described informally as: 294% 295% * Liveness: For all possible outcomes, P -> Q, eventually. 296% * Safety: For all possible outcomes, (\+P ; Q), is invariant. 297% 298% Described practically: 299% 300% P ~> Q, declares that if P is true, then Q must be true, now or at 301% some point in the future. 302% 303 304eventually_implies(P, Q) :- 305 setup_call_cleanup(P, ( Solution = yes ; Solution = no ), assertion(Q)), 306 Solution = yes. 307 308:- op(950, xfy, ~>). 309 310~>(P, Q) :- 311 eventually_implies(P, Q). 312 313ld_dispatch(S, '$tipc_request'(wru(Name)), From) :- 314 !, tipc_get_name(S, Name), 315 term_to_atom(wru(Name), Atom), 316 tipc_send(S, Atom, From, []). 317 318ld_dispatch(S, '$tipc_request'(Term), From) :- 319 !, forall(broadcast_request(Term), 320 ( term_to_atom(Term, Atom), 321 tipc_send(S, Atom, From, []))). 322 323ld_dispatch(_S, Term, _From) :- 324 safely(broadcast(Term)). 325 326tipc_listener_daemon(Parent) :- 327 tipc_socket(S, rdm) ~> tipc_close_socket(S), 328 329% tipc_setopt(S, importance(medium)), 330 tipc_setopt(S, dest_droppable(true)), % discard if not deliverable 331 332 forall(tipc_broadcast_service(Scope, Address), 333 tipc_bind(S, Address, scope(Scope))), 334 335 listen(tipc_broadcast, Head, broadcast_listener(Head)) 336 ~> unlisten(tipc_broadcast), 337 338 thread_send_message(Parent, tipc_listener_daemon_ready), 339 340 repeat, 341 safely(dispatch_traffic(S)). 342 343dispatch_traffic(S) :- 344 tipc_receive(S, Data, From, [as(atom)]), 345 term_to_atom(Term, Data), 346 ld_dispatch(S, Term, From), 347 !, 348 dispatch_traffic(S). 349 350start_tipc_listener_daemon :- 351 catch(thread_property(tipc_listener_daemon, status(running)),_, fail), 352 !. 353 354start_tipc_listener_daemon :- 355 thread_self(Self), 356 thread_create(tipc_listener_daemon(Self), _, 357 [alias(tipc_listener_daemon), detached(true)]), 358 call_with_time_limit(6.0, 359 thread_get_message(tipc_listener_daemon_ready)). 360 361:- multifile tipc:host_to_address/2. 362% 363broadcast_listener(tipc_host_to_address(Host, Addr)) :- 364 tipc:host_to_address(Host, Addr). 365 366broadcast_listener(tipc_broadcast_service(Class, Addr)) :- 367 tipc_broadcast_service(Class, Addr). 368 369broadcast_listener(tipc_node(X)) :- 370 tipc_broadcast(X, node, 0.250). 371 372broadcast_listener(tipc_cluster(X)) :- 373 tipc_broadcast(X, cluster, 0.250). 374 375broadcast_listener(tipc_zone(X)) :- 376 tipc_broadcast(X, zone, 0.250). 377 378broadcast_listener(tipc_node(X, Timeout)) :- 379 tipc_broadcast(X, node, Timeout). 380 381broadcast_listener(tipc_cluster(X, Timeout)) :- 382 tipc_broadcast(X, cluster, Timeout). 383 384broadcast_listener(tipc_zone(X, Timeout)) :- 385 tipc_broadcast(X, zone, Timeout). 386 387% 388% 389 390tipc_basic_broadcast(S, Term, Address) :- 391 tipc_socket(S, rdm) ~> tipc_close_socket(S), 392% tipc_setopt(S, importance(medium)), 393 term_to_atom(Term, Atom), 394 safely(tipc_send(S, Atom, Address, [])). 395 396% directed broadcast to a single listener 397tipc_broadcast(Term:To, _Scope, _Timeout) :- 398 ground(Term), ground(To), 399 !, 400 tipc_basic_broadcast(_S, Term, To), 401 !. 402 403% broadcast to all listeners 404tipc_broadcast(Term, Scope, _Timeout) :- 405 ground(Term), 406 !, 407 tipc_broadcast_service(Scope, Address), 408 tipc_basic_broadcast(_S, Term, Address), 409 !. 410 411% directed broadcast_request to a single listener 412tipc_broadcast(Term:Address, _Scope, Timeout) :- 413 ground(Address), 414 !, 415 tipc_basic_broadcast(S, '$tipc_request'(Term), Address), 416 tipc_br_collect_replies(S, Timeout, Term:Address). 417 418% broadcast_request to all listeners returning responder port-id 419tipc_broadcast(Term:From, Scope, Timeout) :- 420 !, tipc_broadcast_service(Scope, Address), 421 tipc_basic_broadcast(S, '$tipc_request'(Term), Address), 422 tipc_br_collect_replies(S, Timeout, Term:From). 423 424% broadcast_request to all listeners ignoring responder port-id 425tipc_broadcast(Term, Scope, Timeout) :- 426 tipc_broadcast(Term:_, Scope, Timeout). 427 428tipc_br_send_timeout(Port) :- 429 tipc_socket(S, rdm) ~> tipc_close_socket(S), 430 431 tipc_setopt(S, importance(critical)), 432 tipc_send(S, '$tipc_br_timeout', Port, []), 433 !. 434 435tipc_br_collect_replies(S, Timeout, Term:From) :- 436 tipc_get_name(S, Port), 437 alarm(Timeout, tipc_br_send_timeout(Port), Id) 438 ~> remove_alarm(Id), 439 tipc_setopt(S, dispatch(false)), 440 repeat, 441 tipc_receive(S, Atom, From1, [as(atom)]), 442 ( (Atom \== '$tipc_br_timeout') 443 -> (From1 = From, safely(term_to_atom(Term, Atom))) 444 ; (!, fail)). 445 446%! tipc_host_to_address(?Service, ?Address) is nondet. 447% 448% locates a TIPC service by name. Service is an atom or grounded term 449% representing the common name of the service. Address is a TIPC 450% address structure. A server may advertise its services by name by 451% including the fact, tipc:host_to_address(+Service, +Address), 452% somewhere in its source. This predicate can also be used to perform 453% reverse searches. That is it will also resolve an Address to a 454% Service name. The search is zone-wide. Locating a service however, 455% does not imply that the service is actually reachable from any 456% particular node within the zone. 457% 458 459tipc_host_to_address(Host, Address) :- 460 broadcast_request(tipc_zone(tipc_host_to_address(Host, Address))). 461 462%! tipc_initialize is semidet. 463% See tipc:tipc_initialize/0 464% 465:- multifile tipc:tipc_stack_initialize/0. 466 467 468% tipc_stack_initialize() is det. causes any required runtime 469% initialization to occur. This called as a side-effect of 470% tipc_initialize/0, which is now required to be included in an 471% applications intialization directive. 472% 473tipctipc_stack_initialize :- 474 start_tipc_listener_daemon