1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2009-2013, Jeffrey Rosenwald 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(tipc_broadcast, 36 [ tipc_host_to_address/2, % ?Host, ?Address 37 tipc_initialize/0 38 ]). 39:- use_module(library(tipc/tipc),[tipc_initialize/0]).
238:- autoload(tipc, 239 [ tipc_get_name/2, 240 tipc_send/4, 241 tipc_socket/2, 242 tipc_close_socket/1, 243 tipc_setopt/2, 244 tipc_bind/3, 245 tipc_receive/4 246 ]). 247:- autoload(library(broadcast), 248 [broadcast_request/1,broadcast/1,listen/3,unlisten/1]). 249:- use_module(library(debug),[assertion/1]). 250:- autoload(library(time), 251 [call_with_time_limit/2,alarm/3,remove_alarm/1]). 252 253:- require([ thread_self/1 254 , forall/2 255 , term_to_atom/2 256 , thread_send_message/2 257 , catch/3 258 , setup_call_cleanup/3 259 , thread_create/3 260 ]). 261 262:- meta_predicate safely( ), eventually_implies( , ), ~>( , ). 263 264tipc_broadcast_service(node, name_seq(20005, 0, 0)). 265tipc_broadcast_service(cluster, name_seq(20005, 1, 1)). 266tipc_broadcast_service(zone, name_seq(20005, 2, 2)). 267 268% 269% Here's a TIPC bridge to Prolog's broadcast library 270% 271% A sender may confine a broadcast to a subset of a TIPC network by 272% specifying a scoping qualifier in his/her broadcast. The qualifier 273% has the effect of selecting the appropriate multi-cast address for 274% the transmission. Thus, the sender of the message has control over 275% the scope of his/her traffic on a per-message basis. 276% 277% All in-scope listeners receive the broadcast and simply rebroadcast 278% the message locally. All broadcast replies, if any, are sent directly 279% to the sender via the port-id that was received with the broadcast. 280% No additional multiplexing is required. 281% 282 283safely(Predicate) :- 284 catch(Predicate, Err, 285 (Err == '$aborted' -> (!, fail); 286 print_message(error, Err), fail)).
Described practically:
P ~> Q, declares that if P is true, then Q must be true, now or at some point in the future.
304eventually_implies(P, Q) :- 305 setup_call_cleanup(P, ( Solution = yes ; Solution = no ), assertion(Q)), 306 Solution = yes. 307 308:- op(950, xfy, ~>). 309 310~>(P, Q) :- 311 eventually_implies(P, Q). 312 313ld_dispatch(S, '$tipc_request'(wru(Name)), From) :- 314 !, tipc_get_name(S, Name), 315 term_to_atom(wru(Name), Atom), 316 tipc_send(S, Atom, From, []). 317 318ld_dispatch(S, '$tipc_request'(Term), From) :- 319 !, forall(broadcast_request(Term), 320 ( term_to_atom(Term, Atom), 321 tipc_send(S, Atom, From, []))). 322 323ld_dispatch(_S, Term, _From) :- 324 safely(broadcast(Term)). 325 326tipc_listener_daemon(Parent) :- 327 tipc_socket(S, rdm) ~> tipc_close_socket(S), 328 329% tipc_setopt(S, importance(medium)), 330 tipc_setopt(S, dest_droppable(true)), % discard if not deliverable 331 332 forall(tipc_broadcast_service(Scope, Address), 333 tipc_bind(S, Address, scope(Scope))), 334 335 listen(tipc_broadcast, Head, broadcast_listener(Head)) 336 ~> unlisten(tipc_broadcast), 337 338 thread_send_message(Parent, tipc_listener_daemon_ready), 339 340 repeat, 341 safely(dispatch_traffic(S)). 342 343dispatch_traffic(S) :- 344 tipc_receive(S, Data, From, [as(atom)]), 345 term_to_atom(Term, Data), 346 ld_dispatch(S, Term, From), 347 !, 348 dispatch_traffic(S). 349 350start_tipc_listener_daemon :- 351 catch(thread_property(tipc_listener_daemon, status(running)),_, fail), 352 !. 353 354start_tipc_listener_daemon :- 355 thread_self(Self), 356 thread_create(tipc_listener_daemon(Self), _, 357 [alias(tipc_listener_daemon), detached(true)]), 358 call_with_time_limit(6.0, 359 thread_get_message(tipc_listener_daemon_ready)). 360 361:- multifile tipc:host_to_address/2. 362% 363broadcast_listener(tipc_host_to_address(Host, Addr)) :- 364 tipc:host_to_address(Host, Addr). 365 366broadcast_listener(tipc_broadcast_service(Class, Addr)) :- 367 tipc_broadcast_service(Class, Addr). 368 369broadcast_listener(tipc_node(X)) :- 370 tipc_broadcast(X, node, 0.250). 371 372broadcast_listener(tipc_cluster(X)) :- 373 tipc_broadcast(X, cluster, 0.250). 374 375broadcast_listener(tipc_zone(X)) :- 376 tipc_broadcast(X, zone, 0.250). 377 378broadcast_listener(tipc_node(X, Timeout)) :- 379 tipc_broadcast(X, node, Timeout). 380 381broadcast_listener(tipc_cluster(X, Timeout)) :- 382 tipc_broadcast(X, cluster, Timeout). 383 384broadcast_listener(tipc_zone(X, Timeout)) :- 385 tipc_broadcast(X, zone, Timeout). 386 387% 388% 389 390tipc_basic_broadcast(S, Term, Address) :- 391 tipc_socket(S, rdm) ~> tipc_close_socket(S), 392% tipc_setopt(S, importance(medium)), 393 term_to_atom(Term, Atom), 394 safely(tipc_send(S, Atom, Address, [])). 395 396% directed broadcast to a single listener 397tipc_broadcast(Term:To, _Scope, _Timeout) :- 398 ground(Term), ground(To), 399 !, 400 tipc_basic_broadcast(_S, Term, To), 401 !. 402 403% broadcast to all listeners 404tipc_broadcast(Term, Scope, _Timeout) :- 405 ground(Term), 406 !, 407 tipc_broadcast_service(Scope, Address), 408 tipc_basic_broadcast(_S, Term, Address), 409 !. 410 411% directed broadcast_request to a single listener 412tipc_broadcast(Term:Address, _Scope, Timeout) :- 413 ground(Address), 414 !, 415 tipc_basic_broadcast(S, '$tipc_request'(Term), Address), 416 tipc_br_collect_replies(S, Timeout, Term:Address). 417 418% broadcast_request to all listeners returning responder port-id 419tipc_broadcast(Term:From, Scope, Timeout) :- 420 !, tipc_broadcast_service(Scope, Address), 421 tipc_basic_broadcast(S, '$tipc_request'(Term), Address), 422 tipc_br_collect_replies(S, Timeout, Term:From). 423 424% broadcast_request to all listeners ignoring responder port-id 425tipc_broadcast(Term, Scope, Timeout) :- 426 tipc_broadcast(Term:_, Scope, Timeout). 427 428tipc_br_send_timeout(Port) :- 429 tipc_socket(S, rdm) ~> tipc_close_socket(S), 430 431 tipc_setopt(S, importance(critical)), 432 tipc_send(S, '$tipc_br_timeout', Port, []), 433 !. 434 435tipc_br_collect_replies(S, Timeout, Term:From) :- 436 tipc_get_name(S, Port), 437 alarm(Timeout, tipc_br_send_timeout(Port), Id) 438 ~> remove_alarm(Id), 439 tipc_setopt(S, dispatch(false)), 440 repeat, 441 tipc_receive(S, Atom, From1, [as(atom)]), 442 ( (Atom \== '$tipc_br_timeout') 443 -> (From1 = From, safely(term_to_atom(Term, Atom))) 444 ; (!, fail)).
host_to_address(+Service, +Address)
,
somewhere in its source. This predicate can also be used to perform
reverse searches. That is it will also resolve an Address to a
Service name. The search is zone-wide. Locating a service however,
does not imply that the service is actually reachable from any
particular node within the zone.
459tipc_host_to_address(Host, Address) :-
460 broadcast_request(tipc_zone(tipc_host_to_address(Host, Address))).
465:- multifile tipc:tipc_stack_initialize/0. 466 467 468% tipc_stack_initialize() is det. causes any required runtime 469% initialization to occur. This called as a side-effect of 470% tipc_initialize/0, which is now required to be included in an 471% applications intialization directive. 472% 473tipctipc_stack_initialize :- 474 start_tipc_listener_daemon
A TIPC Broadcast Bridge
SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The TIPC broadcast library removes that restriction. With this library loaded, any member of a TIPC network that also has this library loaded may hear and respond to your broadcasts. Using TIPC Broadcast, it becomes a nearly trivial matter to build an instance of supercomputer that researchers within the High Performance Computer community refer to as "Beowulf Class Cluster Computers."
This module has no public predicates. When this module is initialized, it does three things:
A broadcast/1 or broadcast_request/1 that is not directed to one of the six listeners above, behaves as usual and is confined to the instance of Prolog that originated it. But when so directed, the broadcast will be sent to all participating systems, including itself, by way of TIPC's multicast addressing facility. A TIPC broadcast or broadcast request takes the typical form:
broadcast(tipc_node(+Term, +Timeout))
. The principal functorstipc_node
,tipc_cluster
, andtipc_zone
, specify the scope of the broadcast. The functortipc_node
, specifies that the broadcast is to be confined to members of a present TIPC node. Likewise,tipc_cluster
andtipc_zone
, specify that the traffic should be confined to members of a present TIPC cluster and zone, respectively. To prevent the potential for feedback loops, the scope qualifier is stripped from the message before transmission. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request. The default period is 0.250 seconds. The timeout is ignored for broadcasts.An example of three separate processes cooperating on the same Node:
It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a TIPC scoped broadcast/1 or broadcast_request/1, where PortId is the port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a TIPC scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.
Although this capability is needed under some circumstances, it has a tendency to compromise the resilience of the broadcast model. You should not rely on it too heavily, or fault tolerance will suffer.
For example, in order to discover who responded with a particular value:
Caveats
While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:
tipc.pl