View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Hongxin Liang and Jan Wielemaker
    4    E-mail:        jan@swi-prolog.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2021, SWI-Prolog Solutions b.v.
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(stomp,
   36          [ stomp_connection/5,    % +Address, +Host, +Headers,
   37                                   % :Callback, -Connection
   38            stomp_connection/6,    % +Address, +Host, +Headers,
   39                                   % :Callback, -Connection, +Options
   40            stomp_connection_property/2, % ?Connection, ?Property
   41            stomp_destroy_connection/1, % +Connection
   42            stomp_connect/1,       % +Connection
   43            stomp_connect/2,       % +Connection, +Options
   44            stomp_teardown/1,      % +Connection
   45            stomp_reconnect/1,	   % +Connection
   46            stomp_send/4,          % +Connection, +Destination, +Headers, +Body
   47            stomp_send_json/4,     % +Connection, +Destination, +Headers, +JSON
   48            stomp_subscribe/4,     % +Connection, +Destination, +Id, +Headers
   49            stomp_unsubscribe/2,   % +Connection, +Id
   50            stomp_ack/2,           % +Connection, +MsgHeaders
   51            stomp_nack/2,          % +Connection, +MsgHeaders
   52            stomp_ack/3,           % +Connection, +MessageId, +Headers
   53            stomp_nack/3,          % +Connection, +MessageId, +Headers
   54            stomp_transaction/2,   % +Connection, :Goal
   55            stomp_disconnect/2,    % +Connection, +Headers
   56                                   % Low level predicates
   57            stomp_begin/2,         % +Connection, +Transaction
   58            stomp_commit/2,        % +Connection, +Transaction
   59            stomp_abort/2,         % +Connection, +Transaction
   60            stomp_setup/2          % +Connection, +Options
   61          ]).   62
   63/** <module> STOMP client.
   64
   65This module provides a STOMP  (Simple   (or  Streaming)  Text Orientated
   66Messaging Protocol) client. This client  is   based  on  work by Hongxin
   67Liang. The current version is a major rewrite, both changing the API and
   68the low-level STOMP frame (de)serialization.
   69
   70The predicate stomp_connection/5 is used to   register a connection. The
   71connection is established by  stomp_connect/1,   which  is lazily called
   72from any of the predicates that send   a STOMP frame. After establishing
   73the connection two threads are created.   One  receives STOMP frames and
   74the other manages and watches the _heart beat_.
   75
   76## Threading	{#stomp-threading}
   77
   78Upon receiving a frame the   callback registered with stomp_connection/5
   79is called in  the  context  of   the  receiving  thread.  More demanding
   80applications may decide to send incomming frames to a SWI-Prolog message
   81queue and have one  or  more   _worker  threads_  processing  the input.
   82Alternatively, frames may be  inspected  by   the  receiving  thread and
   83either processed immediately or be dispatched   to either new or running
   84threads.  The best scenario depends on the message rate, processing time
   85and concurrency of the Prolog application.
   86
   87All message sending predicates of this library are _thread safe_. If two
   88threads send a frame to the  same   connection  the library ensures that
   89both frames are properly serialized.
   90
   91## Reconnecting	{#stomp-reconnecting}
   92
   93By default this library tries to establish   the connection and the user
   94gets notified by  means  of  a   `disconnected`  pseudo  frame  that the
   95connection is lost. Using the Options argument of stomp_connection/6 the
   96system can be configured to try and keep connecting if the server is not
   97available and reconnect if  the  connection   is  lost.  See the pong.pl
   98example.
   99
  100@author Hongxin Liang and Jan Wielemaker
  101@license BSD-2
  102@see http://stomp.github.io/index.html
  103@see https://github.com/jasonrbriggs/stomp.py
  104@tbd TSL support
  105*/
  106
  107:- meta_predicate
  108    stomp_connection(+, +, +, 4, -),
  109    stomp_connection(+, +, +, 4, -, +),
  110    stomp_transaction(+, 0).  111
  112:- use_module(library(apply)).  113:- use_module(library(debug)).  114:- use_module(library(error)).  115:- use_module(library(gensym)).  116:- use_module(library(http/http_stream)).  117:- use_module(library(http/json)).  118:- use_module(library(readutil)).  119:- use_module(library(socket)).  120:- use_module(library(uuid)).  121:- use_module(library(lists)).  122:- use_module(library(option)).  123:- use_module(library(time)).  124
  125:- dynamic
  126    connection_property/3.  127
  128%!  stomp_connection(+Address, +Host, +Headers, :Callback,
  129%!                   -Connection) is det.
  130%!  stomp_connection(+Address, +Host, +Headers, :Callback,
  131%!                   -Connection, +Options) is det.
  132%
  133%   Create a connection reference. The connection is   not set up yet by
  134%   this predicate. Callback is called on  any received frame except for
  135%   _heart beat_ frames as below.
  136%
  137%   ```
  138%   call(Callback, Command, Connection, Header, Body)
  139%   ```
  140%
  141%   Where command is one of  the  commands   below.  `Header`  is a dict
  142%   holding the STOMP frame header, where  all values are strings except
  143%   for the `'content-length'` key value which is passed as an integer.
  144%
  145%   Body  is  a   string   or,   if   the    data   is   of   the   type
  146%   ``application/json``, a dict.
  147%
  148%     - connected
  149%       A connection was established.  Connection and Header are valid.
  150%     - disconnected
  151%       The connection was lost.  Only Connection is valid.
  152%     - message
  153%       A message arrived.  All three arguments are valid.  Body is
  154%       a dict if the ``content-type`` of the message is
  155%       ``application/json`` and a string otherwise.
  156%     - heartbeat
  157%       A heartbeat was received.  Only Connection is valid.  This
  158%       callback is also called for each newline that follows a frame.
  159%       These newlines can be a heartbeat, but can also be additional
  160%       newlines that follow a frame.
  161%     - heartbeat_timeout
  162%       No heartbeat was received.  Only Connection is valid.
  163%     - error
  164%       An error happened.  All three arguments are valid and handled
  165%       as `message`.
  166%
  167%   Note that stomp_teardown/1 causes the receiving and heartbeat thread
  168%   to be signalled with abort/0.
  169%
  170%   Options processed:
  171%
  172%     - reconnect(+Bool)
  173%       Try to reestablish the connection to the server if the
  174%       connection is lost.  Default is `false`
  175%     - connect_timeout(+Seconds)
  176%       Maximum time to try and reestablish a connection.  The
  177%       default is `600` (10 minutes).
  178%     - json_options(+Options)
  179%       Options passed to json_read_dict/3 to translate
  180%       `application/json` content to Prolog.  Default is `[]`.
  181%
  182%   @arg Address is a valid address   for tcp_connect/3. Normally a term
  183%   Host:Port, e.g., `localhost:32772`.
  184%   @arg Host is a path denoting the STOMP host.  Often just `/`.
  185%   @arg Headers is a dict with STOMP headers used for the ``CONNECT``
  186%   request.
  187%   @arg Connection is an opaque ground term that identifies the
  188%   connection.
  189
  190stomp_connection(Address, Host, Headers, Callback, Connection) :-
  191    stomp_connection(Address, Host, Headers, Callback, Connection, []).
  192
  193stomp_connection(Address, Host, Headers, Callback, Connection, Options) :-
  194    option(reconnect(Reconnect), Options, false),
  195    option(connect_timeout(Timeout), Options, 600),
  196    option(json_options(JSONOptions), Options, []),
  197    valid_address(Address),
  198    must_be(atom, Host),
  199    must_be(dict, Headers),
  200    must_be(callable, Callback),
  201    uuid(Connection),
  202    retractall(connection_property(Connection, _, _)),
  203    update_connection_mapping(
  204        Connection,
  205        _{ address: Address,
  206           callback: Callback,
  207           host: Host,
  208           headers: Headers,
  209           reconnect: Reconnect,
  210           connect_timeout: Timeout,
  211           json_options: JSONOptions
  212         }).
  213
  214valid_address(Host:Port) :-
  215    !,
  216    must_be(atom, Host),
  217    must_be(integer, Port).
  218valid_address(Address) :-
  219    type_error(stom_address, Address).
  220
  221connection_property(address).
  222connection_property(callback).
  223connection_property(host).
  224connection_property(headers).
  225connection_property(reconnect).
  226connection_property(connect_timeout).
  227
  228%!  stomp_connection_property(?Connection, ?Property) is nondet.
  229%
  230%   True when Property, is a property  of Connection. Defined properties
  231%   are:
  232%
  233%     - address(Address)
  234%     - callback(Callback)
  235%     - host(Host)
  236%     - headers(Headers)
  237%     - reconnect(Bool)
  238%     - connect_timeout(Seconds)
  239%       All the above properties result from the stomp_connection/6
  240%       registration.
  241%     - receiver_thread_id(Thread)
  242%     - stream(Stream)
  243%     - heartbeat_thread_id(Thread)
  244%     - received_heartbeat(TimeStamp)
  245%       These describe an active STOMP connection.
  246
  247stomp_connection_property(Connection, Property) :-
  248    var(Property),
  249    !,
  250    connection_property(Connection, Name, Value),
  251    Property =.. [Name,Value].
  252stomp_connection_property(Connection, Property) :-
  253    must_be(compound, Property),
  254    Property =.. [Name,Value],
  255    query_connection_property(Connection, Name, Value).
  256
  257%!  stomp_destroy_connection(+Connection)
  258%
  259%   Destroy a connection. If it is active, first use stomp_teardown/1 to
  260%   disconnect.
  261
  262stomp_destroy_connection(Connection) :-
  263    must_be(ground, Connection),
  264    (   query_connection_property(Connection, address, _)
  265    ->  stomp_teardown(Connection),
  266        retractall(connection_property(Connection, _, _))
  267    ;   existence_error(stomp_connection, Connection)
  268    ).
  269
  270%!  stomp_setup(+Connection, +Options) is det.
  271%
  272%   Set up the actual socket connection and start receiving thread. This
  273%   is a no-op if the connection has   already been created. The Options
  274%   processed are below. Other options are passed to tcp_connect/3.
  275%
  276%     - timeout(+Seconds)
  277%       If tcp_connect/3 fails, retry until the timeout is reached.
  278%       If Seconds is `inf` or `infinite`, keep retrying forever.
  279
  280stomp_setup(Connection, Options) :-
  281    stomp_setup(Connection, _New, Options).
  282
  283stomp_setup(Connection, false, _) :-
  284    query_connection_property(Connection, stream, _Stream),
  285    !.
  286stomp_setup(Connection, New, Options) :-
  287    with_mutex(stomp, stomp_setup_guarded(Connection, New, Options)).
  288
  289stomp_setup_guarded(Connection, false, _) :-
  290    query_connection_property(Connection, stream, _Stream),
  291    !.
  292stomp_setup_guarded(Connection, true, Options) :-
  293    query_connection_property(Connection, address, Address),
  294    connect(Connection, Address, Stream, Options),
  295    set_stream(Stream, encoding(utf8)),
  296    gensym(stomp_receive, Alias),
  297    thread_create(receive(Connection, Stream), ReceiverThreadId, [alias(Alias)]),
  298    debug(stomp(connection), 'Handling input on thread ~p', [ReceiverThreadId]),
  299    update_connection_mapping(Connection,
  300                              _{ receiver_thread_id: ReceiverThreadId,
  301                                 stream:Stream
  302                               }).
  303
  304%!  connect(+Connection, +Address, -Stream, +Options) is det.
  305%
  306%   Connect to Address. If the option timeout(Sec) is present, retry the
  307%   connection until the timeout is reached.
  308
  309connect(Connection, Address, Stream, Options) :-
  310    stomp_deadline(Connection, Deadline, Options),
  311    connect_with_deadline(Connection, Address, Stream, Deadline, Options).
  312
  313connect_with_deadline(_Connection, Address, Stream, once, Options) :-
  314    !,
  315    tcp_connect(Address, Stream, Options).
  316connect_with_deadline(Connection, Address, Stream, Deadline, Options) :-
  317    number(Deadline),
  318    !,
  319    between(0, infinite, Retry),
  320      get_time(Now),
  321      Timeout is Deadline-Now,
  322      (   Now > 0
  323      ->  (   catch(call_with_time_limit(
  324                        Timeout,
  325                        tcp_connect(Address, Stream, Options)),
  326                    Error,
  327                    true)
  328          ->  (   var(Error)
  329              ->  !
  330              ;   (   debugging(stomp(connection))
  331                  ->  print_message(warning, Error)
  332                  ;   true
  333                  ),
  334                  wait_retry(Connection, Error, Retry, Deadline)
  335              )
  336          ;   wait_retry(Connection, failed, Retry, Deadline)
  337          )
  338      ;   throw(stomp_error(connect, Connection, timeout))
  339      ).
  340connect_with_deadline(Connection, Address, Stream, Deadline, Options) :-
  341    between(0, infinite, Retry),
  342      Error = error(Formal, _),
  343      (   catch(tcp_connect(Address, Stream, Options),
  344                Error,
  345                true)
  346      ->  (   var(Formal)
  347          ->  !
  348          ;   (   debugging(stomp(connection))
  349              ->  print_message(warning, Error)
  350              ;   true
  351              ),
  352              wait_retry(Connection, Formal, Retry, Deadline)
  353          )
  354      ;   wait_retry(Connection, failed, Retry, Deadline)
  355      ).
  356
  357wait_retry(Connection, Why, _Retry, _Deadline) :-
  358    Why = error(stomp_error(connect, Connection, error(_)), _),
  359    !,
  360    throw(Why).
  361wait_retry(Connection, _Why, Retry, Deadline) :-
  362    Wait0 is min(10, 0.1 * (1<<Retry)),
  363    (   number(Deadline)
  364    ->  get_time(Now),
  365        Wait is min(Deadline-Now, Wait0)
  366    ;   Wait = Wait0
  367    ),
  368    (   Wait > 0
  369    ->  sleep(Wait),
  370        fail
  371    ;   throw(error(stomp_error(connect, Connection, timeout), _))
  372    ).
  373
  374
  375%!  stomp_teardown(+Connection) is semidet.
  376%
  377%   Tear down the socket connection, stop receiving thread and heartbeat
  378%   thread (if applicable).  The  registration   of  the  connection  as
  379%   created by stomp_connection/5 is preserved and the connection may be
  380%   reconnected using stomp_connect/1.
  381
  382stomp_teardown(Connection) :-
  383    terminate_helper(Connection, receiver_thread_id),
  384    terminate_helper(Connection, heartbeat_thread_id),
  385    forall(query_connection_property(Connection, stream, Stream),
  386           close(Stream, [force(true)])),
  387    debug(stomp(connection), 'retract connection mapping for ~p', [Connection]),
  388    reset_connection_properties(Connection).
  389
  390terminate_helper(Connection, Helper) :-
  391    retract(connection_property(Connection, Helper, Thread)),
  392    \+ thread_self(Thread),
  393    catch(thread_signal(Thread, abort), error(_,_), fail),
  394    !,
  395    thread_join(Thread, _Status).
  396terminate_helper(_, _).
  397
  398reset_connection_properties(Connection) :-
  399    findall(P,
  400            (   query_connection_property(Connection, P, _),
  401                \+ connection_property(P)
  402            ), Ps),
  403    forall(member(P, Ps),
  404           retractall(connection_property(Connection, P, _))).
  405
  406%!  stomp_reconnect(+Connection) is det.
  407%
  408%   Teardown the connection and try to reconnect.
  409
  410stomp_reconnect(Connection) :-
  411    stomp_teardown(Connection),
  412    stomp_connect(Connection).
  413
  414%!  stomp_connect(+Connection) is det.
  415%!  stomp_connect(+Connection, +Options) is det.
  416%
  417%   Setup the connection. First ensures a  socket connection and if this
  418%   is new send a ``CONNECT``  frame.   Protocol  version  and heartbeat
  419%   negotiation will be  handled.  ``STOMP``  frame   is  not  used  for
  420%   backward compatibility.
  421%
  422%   This predicate waits for the connection handshake to have completed.
  423%   Currently it waits for a maximum   of  10 seconds after establishing
  424%   the socket for the server reply.
  425%
  426%   Calling this on an established connection has no effect.
  427%
  428%   @see http://stomp.github.io/stomp-specification-1.2.html#CONNECT_or_STOMP_Frame).
  429%   @error stomp_error(connect, Connection, Detail) if no connection
  430%   could be established.
  431
  432stomp_connect(Connection) :-
  433    stomp_connect(Connection, []).
  434
  435stomp_connect(Connection, Options) :-
  436    update_reconnect_property(Connection),
  437    stomp_deadline(Connection, Deadline, Options),
  438    stomp_deadline_connect(Connection, Deadline, Options).
  439
  440update_reconnect_property(Connection) :-
  441    query_connection_property(Connection, reconnect, disconnected),
  442    !,
  443    update_connection_property(Connection, reconnect, true).
  444update_reconnect_property(_).
  445
  446stomp_deadline_connect(Connection, Deadline, Options) :-
  447    between(0, infinite, Retry),
  448      stomp_setup(Connection, New, [deadline(Deadline)|Options]),
  449      (   New == true
  450      ->  Error = error(Formal, _),
  451          catch(connect_handshake(Connection), Error, true),
  452          (   var(Formal)
  453          ->  !
  454          ;   stomp_teardown(Connection),
  455              wait_retry(Connection, Error, Retry, Deadline)
  456          )
  457      ;   query_connection_property(Connection, connected, _)
  458      ->  true
  459      ;   wait_connected(Connection)
  460      ->  true
  461      ;   stomp_teardown(Connection),
  462          wait_retry(Connection, failed, Retry, Deadline)
  463      ).
  464
  465connect_handshake(Connection) :-
  466    query_connection_property(Connection, headers, Headers),
  467    query_connection_property(Connection, host, Host),
  468    send_frame(Connection,
  469               connect,
  470               Headers.put(_{ 'accept-version':'1.0,1.1,1.2',
  471                              host:Host
  472                            })),
  473    (   Heartbeat = Headers.get('heart-beat')
  474    ->  update_connection_property(Connection, 'heart-beat', Heartbeat)
  475    ;   true
  476    ),
  477    thread_self(Self),
  478    update_connection_property(Connection, waiting_thread, Self),
  479    (   thread_get_message(Self, stomp(connected(Connection, Status)),
  480                           [timeout(10)])
  481    ->  (   Status == true
  482        ->  get_time(Now),
  483            update_connection_property(Connection, connected, Now)
  484        ;   throw(error(stomp_error(connect, Connection, Status), _))
  485        )
  486    ;   throw(error(stomp_error(connect, Connection, timeout), _))
  487    ).
  488
  489wait_connected(Connection) :-
  490    thread_wait(query_connection_property(Connection, connected, _),
  491                [ timeout(10),
  492                  wait_preds([connection_property/3])
  493                ]),
  494    !.
  495wait_connected(Connection) :-
  496    throw(error(stomp_error(connect, Connection, timeout), _)).
  497
  498%!  stomp_deadline(+Connection, -Deadline, +Options) is det.
  499%
  500%   True when there is a connection timeout and Deadline is the deadline
  501%   for establishing a connection.  Deadline is one of
  502%
  503%     - Number
  504%       The deadline as a time stamp
  505%     - infinite
  506%       Keep trying
  507%     - once
  508%       Try to connect once.
  509
  510stomp_deadline(_Connection, Deadline, Options) :-
  511    option(deadline(Deadline), Options),
  512    !.
  513stomp_deadline(Connection, Deadline, Options) :-
  514    (   option(timeout(Time), Options)
  515    ;   query_connection_property(Connection, connect_timeout, Time)
  516    ),
  517    !,
  518    (   number(Time)
  519    ->  get_time(Now),
  520        Deadline is Now+Time
  521    ;   must_be(oneof([inf,infinite]), Time),
  522        Deadline = infinite
  523    ).
  524stomp_deadline(_, once, _).
  525
  526
  527%!  stomp_send(+Connection, +Destination, +Headers, +Body) is det.
  528%
  529%   Send  a  ``SEND``  frame.  If   ``content-type``  is  not  provided,
  530%   ``text/plain`` will be used. ``content-length``   will  be filled in
  531%   automatically.
  532%
  533%   @see http://stomp.github.io/stomp-specification-1.2.html#SEND
  534
  535stomp_send(Connection, Destination, Headers, Body) :-
  536    add_transaction(Headers, Headers1),
  537    send_frame(Connection, send, Headers1.put(destination, Destination), Body).
  538
  539%!  stomp_send_json(+Connection, +Destination, +Headers, +JSON) is det.
  540%
  541%   Send a ``SEND`` frame. ``JSON`` can be either a JSON term or a dict.
  542%   ``content-type`` is filled in  automatically as ``application/json``
  543%   and ``content-length`` will be filled in automatically as well.
  544%
  545%   @see http://stomp.github.io/stomp-specification-1.2.html#SEND
  546
  547stomp_send_json(Connection, Destination, Headers, JSON) :-
  548    add_transaction(Headers, Headers1),
  549    atom_json_term(Body, JSON,
  550                   [ as(string),
  551                     width(0)           % No layout for speed
  552                   ]),
  553    send_frame(Connection, send,
  554               Headers1.put(_{ destination:Destination,
  555                               'content-type':'application/json'
  556                             }),
  557               Body).
  558
  559%!  stomp_subscribe(+Connection, +Destination, +Id, +Headers) is det.
  560%
  561%   Send a ``SUBSCRIBE`` frame.
  562%
  563%   @see http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
  564
  565stomp_subscribe(Connection, Destination, Id, Headers) :-
  566    send_frame(Connection, subscribe,
  567               Headers.put(_{destination:Destination, id:Id})).
  568
  569%!  stomp_unsubscribe(+Connection, +Id) is det.
  570%
  571%   Send an ``UNSUBSCRIBE`` frame.
  572%
  573%   @see http://stomp.github.io/stomp-specification-1.2.html#UNSUBSCRIBE
  574
  575stomp_unsubscribe(Connection, Id) :-
  576    send_frame(Connection, unsubscribe, _{id:Id}).
  577
  578%!  stomp_ack(+Connection, +MessageId, +Headers) is det.
  579%
  580%   Send an ``ACK`` frame. See stomp_ack/2 for simply passing the header
  581%   received with the message we acknowledge.
  582%
  583%   @see http://stomp.github.io/stomp-specification-1.2.html#ACK
  584
  585stomp_ack(Connection, MessageId, Headers) :-
  586    send_frame(Connection, ack, Headers.put('message-id', MessageId)).
  587
  588%!  stomp_nack(+Connection, +MessageId, +Headers) is det.
  589%
  590%   Send a ``NACK`` frame.  See  stomp_nack/2   for  simply  passing the
  591%   header received with the message we acknowledge.
  592%
  593%   @see http://stomp.github.io/stomp-specification-1.2.html#NACK
  594
  595stomp_nack(Connection, MessageId, Headers) :-
  596    send_frame(Connection, nack, Headers.put('message-id', MessageId)).
  597
  598%!  stomp_ack(+Connection, +MsgHeader) is det.
  599%!  stomp_nack(+Connection, +MsgHeader) is det.
  600%
  601%   Reply with an ACK or NACK based on the received message header. On a
  602%   STOMP 1.1 request we get an `ack` field in the header and reply with
  603%   an  `id`.  For  STOMP  1.2  we   reply  with  the  `message-id`  and
  604%   `subscription` that we received with the message.
  605
  606stomp_ack(Connection, Header) :-
  607    stomp_ack_nack(Connection, ack, Header).
  608
  609stomp_nack(Connection, Header) :-
  610    stomp_ack_nack(Connection, nack, Header).
  611
  612stomp_ack_nack(Connection, Type, Header) :-
  613    (   Id = Header.get(ack)
  614    ->  send_frame(Connection, Type, _{id: Id})
  615    ;   Pass = _{'message-id':_, subscription:_},
  616        Pass :< Header
  617    ->  send_frame(Connection, Type, Pass)
  618    ).
  619
  620
  621%!  stomp_begin(+Connection, +Transaction) is det.
  622%
  623%   Send a ``BEGIN`` frame.
  624%   @see http://stomp.github.io/stomp-specification-1.2.html#BEGIN
  625
  626stomp_begin(Connection, Transaction) :-
  627    send_frame(Connection, begin, _{transaction:Transaction}).
  628
  629%!  stomp_commit(+Connection, +Transaction) is det.
  630%
  631%   Send a ``COMMIT`` frame.
  632%
  633%   @see http://stomp.github.io/stomp-specification-1.2.html#COMMIT
  634
  635stomp_commit(Connection, Transaction) :-
  636    send_frame(Connection, commit, _{transaction:Transaction}).
  637
  638%!  stomp_abort(+Connection, +Transaction) is det.
  639%
  640%   Send a ``ABORT`` frame.
  641%
  642%   @see http://stomp.github.io/stomp-specification-1.2.html#ABORT
  643
  644stomp_abort(Connection, Transaction) :-
  645    send_frame(Connection, abort, _{transaction:Transaction}).
  646
  647%!  stomp_transaction(+Connection, :Goal) is semidet.
  648%
  649%   Run Goal as  once/1,  tagging  all   ``SEND``  messages  inside  the
  650%   transaction with the transaction id.  If   Goal  fails  or raises an
  651%   exception the transaction is aborted.   Failure  or exceptions cause
  652%   the transaction to be aborted using   stomp_abort/2, after which the
  653%   result is forwarded.
  654
  655stomp_transaction(Connection, Goal) :-
  656    uuid(TransactionID),
  657    stomp_transaction(Connection, Goal, TransactionID).
  658
  659stomp_transaction(Connection, Goal, TransactionID) :-
  660    stomp_begin(Connection, TransactionID),
  661    (   catch(once(Goal), Error, true)
  662    ->  (   var(Error)
  663        ->  stomp_commit(Connection, TransactionID)
  664        ;   stomp_abort(Connection, TransactionID),
  665            throw(Error)
  666        )
  667    ;   stomp_abort(Connection, TransactionID),
  668        fail
  669    ).
  670
  671in_transaction(TransactionID) :-
  672    prolog_current_frame(Frame),
  673    prolog_frame_attribute(
  674        Frame, parent_goal,
  675        stomp_transaction(_Connection, _Goal, TransactionID)).
  676
  677add_transaction(Headers, Headers1) :-
  678    in_transaction(TransactionID),
  679    !,
  680    Headers1 = Headers.put(transaction, TransactionID).
  681add_transaction(Headers, Headers).
  682
  683
  684%!  stomp_disconnect(+Connection, +Headers) is det.
  685%
  686%   Send a ``DISCONNECT`` frame. If the   connection has the `reconnect`
  687%   property set to `true`, this  will   be  reset  to `disconnected` to
  688%   avoid  reconnecting.  A  subsequent    stomp_connect/2   resets  the
  689%   reconnect status.
  690%
  691%   @see http://stomp.github.io/stomp-specification-1.2.html#DISCONNECT
  692
  693stomp_disconnect(Connection, Headers) :-
  694    (   query_connection_property(Connection, reconnect, true)
  695    ->  update_connection_property(Connection, reconnect, disconnected)
  696    ;   true
  697    ),
  698    send_frame(Connection, disconnect, Headers).
  699
  700%!  send_frame(+Connection, +Command, +Headers) is det.
  701%!  send_frame(+Connection, +Command, +Headers, +Body) is det.
  702%
  703%   Send a frame. If no connection is  established connect first. If the
  704%   sending results in an I/O error, disconnect, reconnect and try again
  705%   if the `reconnect` propertys is set.
  706
  707send_frame(Connection, Command, Headers) :-
  708    send_frame(Connection, Command, Headers, "").
  709
  710send_frame(Connection, Command, Headers, Body) :-
  711    Error = error(Formal,_),
  712    catch(send_frame_guarded(Connection, Command, Headers, Body),
  713          Error,
  714          true),
  715    (   var(Formal)
  716    ->  true
  717    ;   query_connection_property(Connection, reconnect, true)
  718    ->  notify(Connection, disconnected),
  719        stomp_teardown(Connection),
  720        debug(stomp(connection), 'Sending thread reconnects', []),
  721        send_frame(Connection, Command, Headers, Body)
  722    ;   notify(Connection, disconnected),
  723        throw(Error)
  724    ).
  725
  726send_frame_guarded(Connection, Command, Headers, Body) :-
  727    has_body(Command),
  728    !,
  729    connection_stream(Connection, Stream),
  730    default_content_type('text/plain', Headers, Headers1),
  731    body_bytes(Body, ContentLength),
  732    Headers2 = Headers1.put('content-length', ContentLength),
  733    with_output_to(Stream,
  734                   ( send_command(Stream, Command),
  735                     send_header(Stream, Headers2),
  736                     format(Stream, '~w\u0000\n', [Body]),
  737                     flush_output(Stream))).
  738send_frame_guarded(Connection, heartbeat, _Headers, _Body) :-
  739    !,
  740    connection_stream(Connection, Stream),
  741    nl(Stream),
  742    flush_output(Stream).
  743send_frame_guarded(Connection, Command, Headers, _Body) :-
  744    connection_stream(Connection, Stream),
  745    with_output_to(Stream,
  746                   ( send_command(Stream, Command),
  747                     send_header(Stream, Headers),
  748                     format(Stream, '\u0000\n', []),
  749                     flush_output(Stream))).
  750
  751%!  connection_stream(+Connection, -Stream)
  752
  753connection_stream(Connection, Stream) :-
  754    query_connection_property(Connection, stream, Stream),
  755    !.
  756connection_stream(Connection, Stream) :-
  757    stomp_connect(Connection),
  758    query_connection_property(Connection, stream, Stream).
  759
  760send_command(Stream, Command) :-
  761    string_upper(Command, Upper),
  762    format(Stream, '~w\n', [Upper]).
  763
  764send_header(Stream, Headers) :-
  765    dict_pairs(Headers, _, Pairs),
  766    maplist(send_header_line(Stream), Pairs),
  767    nl(Stream).
  768
  769send_header_line(Stream, Name-Value) :-
  770    (   number(Value)
  771    ->  format(Stream, '~w:~w\n', [Name,Value])
  772    ;   escape_value(Value, String),
  773        format(Stream, '~w:~w\n', [Name,String])
  774    ).
  775
  776escape_value(Value, String) :-
  777    split_string(Value, "\n:\\", "", [_]),
  778    !,
  779    String = Value.
  780escape_value(Value, String) :-
  781    string_codes(Value, Codes),
  782    phrase(escape(Codes), Encoded),
  783    string_codes(String, Encoded).
  784
  785escape([]) --> [].
  786escape([H|T]) --> esc(H), escape(T).
  787
  788esc(0'\r) --> !, "\\r".
  789esc(0'\n) --> !, "\\n".
  790esc(0':)  --> !, "\\c".
  791esc(0'\\) --> !, "\\\\".
  792esc(C)    --> [C].
  793
  794default_content_type(ContentType, Header0, Header) :-
  795    (   get_dict('content-type', Header0, _)
  796    ->  Header = Header0
  797    ;   put_dict('content-type', Header0, ContentType, Header)
  798    ).
  799
  800body_bytes(String, Bytes) :-
  801    setup_call_cleanup(
  802        open_null_stream(Out),
  803        ( write(Out, String),
  804          byte_count(Out, Bytes)
  805        ),
  806        close(Out)).
  807
  808
  809		 /*******************************
  810		 *        INCOMING DATA		*
  811		 *******************************/
  812
  813%!  read_frame(+Connection, +Stream, -Frame) is det.
  814%
  815%   Read a frame from a STOMP stream.   On end-of-file, Frame is unified
  816%   with the atom `end_of_file`. Otherwise  it   is  a  dict holding the
  817%   `cmd`, `headers` (another dict) and `body` (a string)
  818
  819read_frame(Connection, Stream, Frame) :-
  820    read_command(Stream, Command),
  821    (   Command == end_of_file
  822    ->  Frame = end_of_file
  823    ;   Command == heartbeat
  824    ->  Frame = stomp_frame{cmd:heartbeat}
  825    ;   read_header(Stream, Header),
  826        (   has_body(Command)
  827        ->  read_content(Connection, Stream, Header, Content),
  828            Frame = stomp_frame{cmd:Command, headers:Header, body:Content}
  829        ;   Frame = stomp_frame{cmd:Command, headers:Header}
  830        )
  831    ).
  832
  833has_body(send).
  834has_body(message).
  835has_body(error).
  836
  837read_command(Stream, Command) :-
  838    read_line_to_string(Stream, String),
  839    debug(stomp(command), 'Got line ~p', [String]),
  840    (   String == end_of_file
  841    ->  Command = end_of_file
  842    ;   String == ""
  843    ->  Command = heartbeat
  844    ;   string_lower(String, Lwr),
  845        atom_string(Command, Lwr)
  846    ).
  847
  848read_header(Stream, Header) :-
  849    read_header_lines(Stream, Pairs, []),
  850    dict_pairs(Header, stomp_header, Pairs).
  851
  852read_header_lines(Stream, Header, Seen) :-
  853    read_line_to_string(Stream, Line),
  854    (   Line == ""
  855    ->  Header = []
  856    ;   sub_string(Line, Before, _, After, ":")
  857    ->  sub_atom(Line, 0, Before, _, Name),
  858        (   memberchk(Name, Seen)
  859        ->  read_header_lines(Stream, Header, Seen)
  860        ;   sub_string(Line, _, After, 0, Value0),
  861            convert_value(Name, Value0, Value),
  862            Header = [Name-Value|More],
  863            read_header_lines(Stream, More, [Name|Seen])
  864        )
  865    ).
  866
  867convert_value('content-length', String, Bytes) :-
  868    !,
  869    number_string(Bytes, String).
  870convert_value(_, String, Value) :-
  871    unescape_header_value(String, Value).
  872
  873unescape_header_value(String, Value) :-
  874    sub_atom(String, _, _, _, "\\"),
  875    !,
  876    string_codes(String, Codes),
  877    phrase(unescape(Plain), Codes),
  878    string_codes(Value, Plain).
  879unescape_header_value(String, String).
  880
  881unescape([H|T]) --> "\\", !, unesc(H), unescape(T).
  882unescape([H|T]) --> [H], !, unescape(T).
  883unescape([]) --> [].
  884
  885unesc(0'\r) --> "r", !.
  886unesc(0'\n) --> "n", !.
  887unesc(0':)  --> "c", !.
  888unesc(0'\\) --> "\\", !.
  889unesc(_) --> [C], { syntax_error(invalid_stomp_escape(C)) }.
  890
  891%!  read_content(+Connection, +Stream, +Header, -Content) is det.
  892%
  893%   Read the body. Note that the body   may  be followed by an arbitrary
  894%   number of newlines. We leave them in place to avoid blocking.
  895
  896read_content(Connection, Stream, Header, Content) :-
  897    _{ 'content-length':Bytes,
  898       'content-type':Type
  899     } :< Header,
  900    setup_call_cleanup(
  901        stream_range_open(Stream, DataStream, [size(Bytes)]),
  902        read_content(Connection, Type, DataStream, Header, Content),
  903        close(DataStream)).
  904
  905read_content(Connection, "application/json", Stream, _Header, Content) :-
  906    !,
  907    query_connection_property(Connection, json_options, Options),
  908    json_read_dict(Stream, Content, Options).
  909read_content(_Connection, _Type, Stream, _Header, Content) :-
  910    read_string(Stream, _, Content).
  911
  912
  913%!  receive(+Connection, +Stream) is det.
  914%
  915%   Read and process incoming messages from Stream.
  916
  917receive(Connection, Stream) :-
  918    E = error(Formal, _),
  919    catch(read_frame(Connection, Stream, Frame), E, true),
  920    !,
  921    (   var(Formal)
  922    ->  debug(stomp(receive), 'received frame ~p', [Frame]),
  923        (   Frame == end_of_file
  924        ->  receive_done(Connection, end_of_file)
  925        ;   process_frame(Connection, Frame),
  926            receive(Connection, Stream)
  927        )
  928    ;   receive_done(Connection, E)
  929    ).
  930receive(Connection, _Stream) :-
  931    receive_done(Connection, "failed to read frame").
  932
  933%!  receive_done(+Connection, +Why)
  934%
  935%   The receiver thread needs to  close   the  connection due to reading
  936%   end-of-file, an I/O error or failure to parse a frame. If connection
  937%   is configured to be restarted this   thread  will try to reestablish
  938%   the connection. After reestablishing the   connection  this receiver
  939%   thread terminates.
  940
  941receive_done(Connection, Why) :-
  942    (   Why = error(_,_)
  943    ->  print_message(warning, Why)
  944    ;   true
  945    ),
  946    notify(Connection, disconnected),
  947    stomp_teardown(Connection),
  948    (   query_connection_property(Connection, reconnect, true)
  949    ->  debug(stomp(connection), 'Receiver thread reconnects (~p)', [Why]),
  950        stomp_connect(Connection)
  951    ;   debug(stomp(connection), 'Receiver thread terminates (~p)', [Why])
  952    ),
  953    thread_self(Me),
  954    thread_detach(Me).
  955
  956%!  process_frame(+Connection, +Frame) is det.
  957%
  958%   Process an incoming frame.
  959
  960process_frame(Connection, Frame) :-
  961    Frame.cmd = heartbeat, !,
  962    get_time(Now),
  963    debug(stomp(heartbeat), 'received heartbeat at ~w', [Now]),
  964    update_connection_property(Connection, received_heartbeat, Now),
  965    notify(Connection, heartbeat, _{}, "").
  966process_frame(Connection, Frame) :-
  967    _{cmd:FrameType, headers:Headers, body:Body} :< Frame,
  968    !,
  969    process_connect(FrameType, Connection, Frame),
  970    notify(Connection, FrameType, Headers, Body).
  971process_frame(Connection, Frame) :-
  972    _{cmd:FrameType, headers:Headers} :< Frame,
  973    process_connect(FrameType, Connection, Frame),
  974    notify(Connection, FrameType, Headers).
  975
  976process_connect(connected, Connection, Frame) :-
  977    retract(connection_property(Connection, waiting_thread, Waiting)),
  978    !,
  979    thread_send_message(Waiting, stomp(connected(Connection, true))),
  980    start_heartbeat_if_required(Connection, Frame.headers).
  981process_connect(error, Connection, Frame) :-
  982    retract(connection_property(Connection, waiting_thread, Waiting)),
  983    !,
  984    thread_send_message(
  985        Waiting,
  986        stomp(connected(Connection, error(Frame.body)))).
  987process_connect(_, _, _).
  988
  989start_heartbeat_if_required(Connection, Headers) :-
  990    (   query_connection_property(Connection, 'heart-beat', CHB),
  991        SHB = Headers.get('heart-beat')
  992    ->  start_heartbeat(Connection, CHB, SHB)
  993    ;   true
  994    ).
  995
  996start_heartbeat(Connection, CHB, SHB) :-
  997    extract_heartbeats(CHB, CX, CY),
  998    extract_heartbeats(SHB, SX, SY),
  999    calculate_heartbeats(CX-CY, SX-SY, X-Y),
 1000    \+ (X =:= 0, Y =:= 0),
 1001    !,
 1002    debug(stomp(heartbeat), 'calculated heartbeats are ~p,~p', [X, Y]),
 1003    SendSleep is X / 1000,
 1004    ReceiveSleep is Y / 1000 + 2,
 1005    (   X =:= 0
 1006    ->  SleepTime = ReceiveSleep
 1007    ;   (   Y =:= 0
 1008        ->  SleepTime = SendSleep
 1009        ;   SleepTime is gcd(X, Y) / 2000
 1010        )
 1011    ),
 1012    get_time(Now),
 1013    gensym(stomp_heartbeat, Alias),
 1014    debug(stomp(heartbeat), 'Creating heartbeat thread (~p ~p ~p)',
 1015          [SendSleep, ReceiveSleep, SleepTime]),
 1016    thread_create(heartbeat_loop(Connection, SendSleep, ReceiveSleep,
 1017                                 SleepTime, Now, Now),
 1018                  HeartbeatThreadId, [alias(Alias)]),
 1019    update_connection_mapping(Connection,
 1020                              _{ heartbeat_thread_id:HeartbeatThreadId,
 1021                                 received_heartbeat:Now
 1022                               }).
 1023start_heartbeat(_, _, _).
 1024
 1025extract_heartbeats(Heartbeat, X, Y) :-
 1026    split_string(Heartbeat, ",", " ", [XS, YS]),
 1027    number_string(X, XS),
 1028    number_string(Y, YS).
 1029
 1030calculate_heartbeats(CX-CY, SX-SY, X-Y) :-
 1031    (   CX =\= 0, SY =\= 0
 1032    ->  X is max(CX, floor(SY))
 1033    ;   X = 0
 1034    ),
 1035    (   CY =\= 0, SX =\= 0
 1036    ->  Y is max(CY, floor(SX))
 1037    ;   Y = 0
 1038    ).
 1039
 1040heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime,
 1041               SendTime, ReceiveTime) :-
 1042    sleep(SleepTime),
 1043    get_time(Now),
 1044    (   Now - SendTime > SendSleep
 1045    ->  SendTime1 = Now,
 1046        debug(stomp(heartbeat), 'sending a heartbeat message at ~p', [Now]),
 1047        send_frame(Connection, heartbeat, _{})
 1048    ;   SendTime1 = SendTime
 1049    ),
 1050    (   Now - ReceiveTime > ReceiveSleep
 1051    ->  ReceiveTime1 = Now,
 1052        check_receive_heartbeat(Connection, Now, ReceiveSleep)
 1053    ;   ReceiveTime1 = ReceiveTime
 1054    ),
 1055    heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime,
 1056                   SendTime1, ReceiveTime1).
 1057
 1058check_receive_heartbeat(Connection, Now, ReceiveSleep) :-
 1059    query_connection_property(Connection, received_heartbeat, ReceivedHeartbeat),
 1060    DiffReceive is Now - ReceivedHeartbeat,
 1061    (   DiffReceive > ReceiveSleep
 1062    ->  debug(stomp(heartbeat),
 1063              'Heartbeat timeout: diff_receive=~p, time=~p, lastrec=~p',
 1064              [DiffReceive, Now, ReceivedHeartbeat]),
 1065        notify(Connection, heartbeat_timeout)
 1066    ;   true
 1067    ).
 1068
 1069%!  notify(+Connection, +FrameType) is det.
 1070%!  notify(+Connection, +FrameType, +Header) is det.
 1071%!  notify(+Connection, +FrameType, +Header, +Body) is det.
 1072%
 1073%   Call the callback using  FrameType.
 1074
 1075notify(Connection, FrameType) :-
 1076    notify(Connection, FrameType, stomp_header{cmd:FrameType}, "").
 1077
 1078notify(Connection, FrameType, Header) :-
 1079    notify(Connection, FrameType, Header, "").
 1080
 1081notify(Connection, FrameType, Header, Body) :-
 1082    query_connection_property(Connection, callback, Callback),
 1083    Error = error(Formal,_),
 1084    (   catch_with_backtrace(
 1085            call(Callback, FrameType, Connection, Header, Body),
 1086            Error, true)
 1087    ->  (   var(Formal)
 1088        ->  true
 1089        ;   print_message(warning, Error)
 1090        )
 1091    ;   true
 1092    ).
 1093
 1094update_connection_mapping(Connection, Dict) :-
 1095    dict_pairs(Dict, _, Pairs),
 1096    maplist(update_connection_property(Connection), Pairs).
 1097
 1098update_connection_property(Connection, Name-Value) :-
 1099    update_connection_property(Connection, Name, Value).
 1100
 1101update_connection_property(Connection, Name, Value) :-
 1102    transaction(update_connection_property_(Connection, Name, Value)).
 1103
 1104update_connection_property_(Connection, Name, Value) :-
 1105    retractall(connection_property(Connection, Name, _)),
 1106    asserta(connection_property(Connection, Name, Value)).
 1107
 1108query_connection_property(Connection, Name, Value) :-
 1109    (   nonvar(Name),
 1110        nonvar(Connection)
 1111    ->  connection_property(Connection, Name, Value),
 1112        !
 1113    ;   connection_property(Connection, Name, Value)
 1114    ).
 1115
 1116
 1117		 /*******************************
 1118		 *            MESSAGES		*
 1119		 *******************************/
 1120
 1121:- multifile prolog:error_message//1. 1122
 1123prolog:error_message(stomp_error(connect, Connection, error(Message))) -->
 1124    { connection_property(Connection, address, Address) },
 1125    [ 'STOMPL: Failed to connect to ~p: ~p'-[Address, Message] ].
 1126prolog:error_message(stomp_error(connect, Connection, Detail)) -->
 1127    { connection_property(Connection, address, Address) },
 1128    [ 'STOMPL: Failed to connect to ~p: ~p'-[Address, Detail] ]