View source with formatted comments or as raw
    1/*  Prolog Machine Query Interface
    2    Author:        Eric Zinda
    3    E-mail:        ericz@inductorsoftware.com
    4    WWW:           http://www.inductorsoftware.com
    5    Copyright (c)  2021, Eric Zinda
    6    All rights reserved.
    7
    8        Redistribution and use in source and binary forms, with or without
    9    modification, are permitted provided that the following conditions
   10    are met:
   11
   12    1. Redistributions of source code must retain the above copyright
   13       notice, this list of conditions and the following disclaimer.
   14
   15    2. Redistributions in binary form must reproduce the above copyright
   16       notice, this list of conditions and the following disclaimer in
   17       the documentation and/or other materials provided with the
   18       distribution.
   19
   20    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   21    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   22    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   23    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   24    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   25    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   26    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   27    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   28    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   29    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   30    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   31    POSSIBILITY OF SUCH DAMAGE.
   32*/
   33
   34:- module(mqi,
   35          [ mqi_start/0,
   36            mqi_start/1,                % +Options
   37            mqi_stop/1,                 % ?Thread
   38            mqi_version/2               % ?Major_Version, ?Minor_Version
   39          ]).   40
   41/**
   42  mqi_start(+Options:list) is semidet.
   43
   44Starts a Prolog Machine Query Interface ('MQI') using Options. The MQI is normally started automatically by a library built for a particular programming language such as the [`swiplserver` Python library](#mqi-python-installation), but starting manually can be useful when debugging Prolog code in some scenarios. See the documentation on ["Standalone Mode"](#mqi-standalone-mode) for more information.
   45
   46Once started, the MQI listens for TCP/IP or Unix Domain Socket connections and authenticates them using the password provided (or created depending on options) before processing any messages.  The messages processed by the MQI are described [below](#mqi-messages).
   47
   48For debugging, the server outputs traces using the `debug/3` predicate so that the server operation can be observed by using the `debug/1` predicate. Run the following commands to see them:
   49
   50- `debug(mqi(protocol))`: Traces protocol messages to show the flow of commands and connections.  It is designed to avoid filling the screen with large queries and results to make it easier to read.
   51- `debug(mqi(query))`: Traces messages that involve each query and its results. Therefore it can be quite verbose depending on the query.
   52
   53
   54__Options__
   55
   56Options is a list containing any combination of the following options. When used in the Prolog top level (i.e. in [Standalone Mode](#mqi-standalone-mode)), these are specified as normal Prolog options like this:
   57~~~
   58mqi_start([unix_domain_socket(Socket), password('a password')])
   59~~~
   60When using ["Embedded Mode"](#mqi-embedded-mode) they are passed using the same name but as normal command line arguments like this:
   61~~~
   62swipl mqi --write_connection_values=true
   63          --password="a password" --create_unix_domain_socket=true
   64~~~
   65
   66Note the use of quotes around values that could confuse command line
   67processing like spaces (e.g. "a password") and that
   68`unix_domain_socket(Variable)` is written as
   69=|--create_unix_domain_socket=true|= on the command line. See below for
   70more information.
   71
   72- port(?Port)
   73The TCP/IP port to bind to on localhost. This option is ignored if the `unix_domain_socket/1` option is set. Port is either a legal TCP/IP port number (integer) or a variable term like `Port`. If it is a variable, it causes the system to select a free port and unify the variable with the selected port as in `tcp_bind/2`. If the option `write_connection_values(true)` is set, the selected port is output to STDOUT followed by `\n` on startup to allow the client language library to retrieve it in ["Embedded Mode"](#mqi-embedded-mode).
   74
   75- unix_domain_socket(?Unix_Domain_Socket_Path_And_File)
   76If set, Unix Domain Sockets will be used as the way to communicate with the server. `Unix_Domain_Socket_Path_And_File` specifies the fully qualified path and filename to use for the socket.
   77
   78To have one generated instead (recommended), pass `Unix_Domain_Socket_Path_And_File` as a variable when calling from the Prolog top level and the variable will be unified with a created filename. If launching in ["Embedded Mode"](#mqi-embedded-mode), instead pass =|--create_unix_domain_socket=true|= since there isn't a way to specify variables from the command line. When generating the file, a temporary directory will be created using `tmp_file/2` and a socket file will be created within that directory following the below requirements.  If the directory and file are unable to be created for some reason, mqi_start/1 fails.
   79
   80Regardless of whether the file is specified or generated, if the option `write_connection_values(true)` is set, the fully qualified path to the generated file is output to STDOUT followed by `\n` on startup to allow the client language library to retrieve it.
   81
   82Specifying a file to use should follow the same guidelines as the generated file:
   83    - If the file exists when the MQI is launched, it will be deleted.
   84    - The Prolog process will attempt to create and, if Prolog exits cleanly, delete this file (and directory if it was created) when the MQI closes.  This means the directory from a specified file must have the appropriate permissions to allow the Prolog process to do so.
   85    - For security reasons, the filename should not be predictable and the directory it is contained in should have permissions set so that files created are only accessible to the current user.
   86    - The path must be below 92 *bytes* long (including null terminator) to be portable according to the Linux documentation.
   87
   88- password(?Password)
   89The password required for a connection. If not specified (recommended), the MQI will generate one as a Prolog string type since Prolog atoms are globally visible (be sure not to convert to an atom for this reason). If `Password` is a variable it will be unified with the created password. Regardless of whether the password is specified or generated, if the option `write_connection_values(true)` is set, the password is output to STDOUT followed by `\n` on startup to allow the client language library to retrieve it. This is the recommended way to integrate the MQI with a language as it avoids including the password as source code. This option is only included so that a known password can be supplied for when the MQI is running in Standalone Mode.
   90
   91- query_timeout(+Seconds)
   92Sets the default time in seconds that a query is allowed to run before it is cancelled. This can be overridden on a query by query basis. If not set, the default is no timeout (`-1`).
   93
   94- pending_connections(+Count)
   95Sets the number of pending connections allowed for the MQI as in `tcp_listen/2`. If not provided, the default is `5`.
   96
   97- run_server_on_thread(+Run_Server_On_Thread)
   98Determines whether `mqi_start/1` runs in the background on its own thread or blocks until the MQI shuts down.  Must be missing or set to `true` when running in ["Embedded Mode"](#mqi-embedded-mode) so that the SWI Prolog process can exit properly. If not set, the default is `true`.
   99
  100- server_thread(?Server_Thread)
  101Specifies or retrieves the name of the thread the MQI will run on if `run_server_on_thread(true)`. Passing in an atom for Server_Thread will only set the server thread name if run_server_on_thread(true).  If `Server_Thread` is a variable, it is unified with a generated name.
  102
  103- write_connection_values(+Write_Connection_Values)
  104Determines whether the server writes the port (or generated Unix Domain Socket) and password to STDOUT as it initializes. Used by language libraries to retrieve this information for connecting. If not set, the default is `false`.
  105
  106- write_output_to_file(+File)
  107Redirects STDOUT and STDERR to the file path specified.  Useful for debugging the MQI when it is being used in ["Embedded Mode"](#mqi-embedded-mode). If using multiple MQI instances in one SWI Prolog instance, only set this on the first one.  Each time it is set the output will be redirected.
  108
  109*/
  110:- use_module(library(socket)).  111:- use_module(library(http/json)).  112:- use_module(library(http/json_convert)).  113:- use_module(library(http/http_stream)).  114:- use_module(library(option)).  115:- use_module(library(term_to_json)).  116:- use_module(library(debug)).  117:- use_module(library(filesex)).  118:- use_module(library(gensym)).  119:- use_module(library(lists)).  120:- use_module(library(main)).  121:- use_module(library(make)).  122:- use_module(library(prolog_source)).  123:- use_module(library(time)).  124:- use_module(library(uuid)).  125
  126% One for every Machine Query Interface running
  127:- dynamic(mqi_thread/3).  128
  129% One for every active connection
  130:- dynamic(mqi_worker_threads/3).  131:- dynamic(mqi_socket/5).  132
  133% Indicates that a query is in progress on the goal thread or hasn't had its results drained
  134% Deleted once the last result from the queue has been drained
  135% Only deleted by the communication thread to avoid race conditions
  136:- dynamic(query_in_progress/1).  137
  138% Indicates to the communication thread that we are in a place
  139% that can be cancelled
  140:- dynamic(safe_to_cancel/1).  141
  142%!  mqi_version(?Major_Version, ?Minor_Version) is det.
  143%
  144%  Provides the major and minor version number of the protocol used by the MQI.
  145%  The protocol includes the message format and the messages that can
  146%  be sent and received from the MQI.
  147%
  148%  Note that the initial version of the MQI did not have a version predicate so
  149%  The proper way for callers to check the version is:
  150%
  151%  use_module(library(mqi)),
  152%  (   current_predicate(mqi_version/2)
  153%  ->  mqi_version(Major_Version, Minor_Version)
  154%  ;   Major_Version = 0, Minor_Version = 0
  155%  )
  156%
  157%  Major versions are increased when there is a change to the protocol that will
  158%  likely break clients written to the previous version. Minor versions are increased
  159%  when there is new functionality that will *not* break clients written to the old version
  160%
  161%  This allows a client written to MQI version 'Client_Major_Version.Client_Minor_Version'
  162%  to check for non-breaking compatibility like this:
  163%
  164%  Client_Major_Version = MQI_Major_Version and Client_Minor_Version <= MQI_Minor_Version
  165%
  166%  Breaking changes (i.e. Major version increments) should be very rare as the goal is to
  167%  have the broadest adoption possible.
  168%
  169%  Protocol Version History:
  170%  - 0.0 First published version. Had a protocol bug that required messages sent to MQI to
  171%        count Unicode code points instead of bytes for the message header.
  172%
  173%  - 1.0 Breaking change: Fixed protocol bug so that it properly accepted byte count instead of Unicode code point
  174%        count in the message header for messages sent to MQI.
  175mqi_version(1, 0).
  176
  177
  178% Password is carefully constructed to be a string (not an atom) so that it is not
  179% globally visible
  180% Add ".\n" to the password since it will be added by the message when received
  181mqi_start(Options) :-
  182    Encoding = utf8,
  183    option(pending_connections(Connection_Count), Options, 5),
  184    option(query_timeout(Query_Timeout), Options, -1),
  185    option(port(Port), Options, _),
  186    option(run_server_on_thread(Run_Server_On_Thread), Options, true),
  187    option(exit_main_on_failure(Exit_Main_On_Failure), Options, false),
  188    option(write_connection_values(Write_Connection_Values), Options, false),
  189    option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _),
  190    (   (   memberchk(unix_domain_socket(_), Options),
  191            var(Unix_Domain_Socket_Path_And_File)
  192        )
  193    ->  unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File)
  194    ;   true
  195    ),
  196    option(server_thread(Server_Thread_ID), Options, _),
  197    (   var(Server_Thread_ID)
  198    ->  gensym(mqi, Server_Thread_ID)
  199    ;   true
  200    ),
  201    option(password(Password), Options, _),
  202    (   var(Password)
  203    ->  (   current_prolog_flag(bounded, false)
  204        ->  uuid(UUID, [format(integer)])
  205        ;   UUID is random(1<<62)
  206        ),
  207        format(string(Password), '~d', [UUID])
  208    ;   true
  209    ),
  210    string_concat(Password, '.\n', Final_Password),
  211    bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address),
  212    send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password),
  213    option(write_output_to_file(File), Options, _),
  214    (   var(File)
  215    ->  true
  216    ;   write_output_to_file(File)
  217    ),
  218    Server_Goal = (
  219                    catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true),
  220                    debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)])
  221                 ),
  222    start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File).
  223
  224opt_type(port,                      port,                      natural).
  225opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean).
  226opt_type(unix_domain_socket,        unix_domain_socket,        file(write)).
  227opt_type(password,                  password,                  string).
  228opt_type(pending_connections,       pending_connections,       nonneg).
  229opt_type(query_timeout,             query_timeout,             float).
  230opt_type(run_server_on_thread,      run_server_on_thread,      boolean).
  231opt_type(exit_main_on_failure,      exit_main_on_failure,      boolean).
  232opt_type(write_connection_values,   write_connection_values,   boolean).
  233opt_type(write_output_to_file,      write_output_to_file,      file(write)).
  234
  235opt_help(port,                      "TCP/IP port for clients to connect to").
  236opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to").
  237opt_help(unix_domain_socket,        "File path for the Unix domain socket").
  238opt_help(password,                  "Connection password").
  239opt_help(pending_connections,       "Max number of queued connections (5)").
  240opt_help(query_timeout,             "Max query runtime in seconds (default infinite)").
  241opt_help(run_server_on_thread,      "Run server in a background thread (true)").
  242opt_help(exit_main_on_failure,      "Exit the process on a failure").
  243opt_help(write_connection_values,   "Print info for clients to connect").
  244opt_help(write_output_to_file,      "Write stdout and stderr to file").
  245
  246%!  mqi_start is semidet.
  247%
  248%  Main  entry  point  for  running  the   Machine  Query  Interface  in
  249%  ["Embedded Mode"](#mqi-embedded-mode) and designed to  be called from
  250%  the command line. Embedded Mode is   used  when launching the Machine
  251%  Query Interface as  an  embedded  part   of  another  language  (e.g.
  252%  Python).  Calling  mqi_start/0  from  Prolog   interactively  is  not
  253%  recommended as it depends on Prolog exiting  to stop the MQI, instead
  254%  use mqi_start/1 for interactive use.
  255%
  256%  To launch embedded mode:
  257%
  258%  ~~~
  259%  swipl mqi --write_connection_values=true
  260%  ~~~
  261%
  262%  This will start SWI Prolog and   invoke the mqi_start/0 predicate and
  263%  exit  the  process  when  that  predicate  stops.  Any  command  line
  264%  arguments after the standalone `--` will  be passed as Options. These
  265%  are the same Options that mqi_start/1 accepts   and  are passed to it
  266%  directly. Some options are expressed differently  due to command line
  267%  limitations, see mqi_start/1 Options for more information.
  268%
  269%  Any Option values that cause issues during command line parsing (such
  270%  as spaces) should be passed with =|""|= like this:
  271%
  272%  ~~~
  273%  swipl mqi --write_connection_values=true --password="HGJ SOWLWW"
  274%  ~~~
  275%
  276%  For help on commandline options run
  277%
  278%  ~~~
  279%  swipl mqi --help
  280%  ~~~
  281
  282
  283% Turn off int signal when running in embedded mode so the client language
  284% debugger signal doesn't put Prolog into debug mode
  285% run_server_on_thread must be missing or true (the default) so we can exit
  286% properly
  287% create_unix_domain_socket=true/false is only used as a command line argument
  288% since it doesn't seem possible to pass create_unix_domain_socket=_ on the command line
  289% and have it interpreted as a variable.
  290mqi_start :-
  291    current_prolog_flag(argv, Argv),
  292    argv_options(Argv, _Args, Options),
  293    merge_options(Options, [exit_main_on_failure(true)], Options1),
  294    select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false),
  295    (   Create_Unix_Domain_Socket == true
  296    ->  merge_options(Options2, [unix_domain_socket(_)], FinalOptions)
  297    ;   FinalOptions = Options2
  298    ),
  299    option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true),
  300    (   Run_Server_On_Thread == true
  301    ->  true
  302    ;   throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread))
  303    ),
  304    mqi_start(FinalOptions),
  305    on_signal(int, _, quit),
  306    thread_get_message(quit_mqi).
  307
  308
  309quit(_) :-
  310    thread_send_message(main, quit_mqi).
  311
  312
  313%! mqi_stop(?Server_Thread_ID:atom) is det.
  314%
  315% If `Server_Thread_ID` is a variable, stops all Machine Query Interfaces and associated threads.  If `Server_Thread_ID` is an atom, then only the MQI with that `Server_Thread_ID` is stopped. `Server_Thread_ID` can be provided or retrieved using `Options` in `mqi_start/1`.
  316%
  317% Always succeeds.
  318
  319% tcp_close_socket(Socket) will shut down the server thread cleanly so the socket is released and can be used again in the same session
  320% Closes down any pending connections using abort even if there were no matching server threads since the server thread could have died.
  321% At this point only threads associated with live connections (or potentially a goal thread that hasn't detected its missing communication thread)
  322% should be left so seeing abort warning messages in the console seems OK
  323mqi_stop(Server_Thread_ID) :-
  324    % First shut down any matching servers to stop new connections
  325    forall(retract(mqi_thread(Server_Thread_ID, _, Socket)),
  326        (
  327            debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]),
  328            catch(tcp_close_socket(Socket), Socket_Exception, true),
  329            abortSilentExit(Server_Thread_ID, Server_Thread_Exception),
  330            debug(mqi(protocol), "Stopped server thread: ~w, socket_close_exception(~w), stop_thread_exception(~w)", [Server_Thread_ID, Socket_Exception, Server_Thread_Exception])
  331        )),
  332    forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)),
  333        (
  334            abortSilentExit(Communication_Thread_ID, CommunicationException),
  335            debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)", [Server_Thread_ID, Communication_Thread_ID, CommunicationException]),
  336            abortSilentExit(Goal_Thread_ID, Goal_Exception),
  337            debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)", [Server_Thread_ID, Goal_Thread_ID, Goal_Exception])
  338        )).
  339
  340
  341start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
  342    (   Run_Server_On_Thread
  343    ->  (   thread_create(Server_Goal, _, [ alias(Server_Thread_ID),
  344                                            at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
  345                                                     detach_if_expected(Server_Thread_ID)
  346                                                    ))
  347                                          ]),
  348            debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID])
  349        )
  350    ;   (   Server_Goal,
  351            delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
  352            debug(mqi(protocol), "Halting.", [])
  353        )
  354    ).
  355
  356
  357% Unix domain sockets create a file that needs to be cleaned up
  358% If mqi generated it, there is also a directory that needs to be cleaned up
  359%   that will only contain that file
  360delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
  361    (   nonvar(Unix_Domain_Socket_Path)
  362    ->  catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true)
  363    ;   (   nonvar(Unix_Domain_Socket_Path_And_File)
  364        ->  catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true)
  365        ;   true
  366        )
  367    ).
  368
  369:- if(current_predicate(unix_domain_socket/1)).  370    optional_unix_domain_socket(Socket) :-
  371        unix_domain_socket(Socket).
  372:- else.  373    optional_unix_domain_socket(_).
  374:- endif.  375
  376% Always bind only to localhost for security reasons
  377% Delete the socket file in case it is already around so that the same name can be reused
  378bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :-
  379    (   nonvar(Unix_Domain_Socket_Path_And_File)
  380    ->  debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]),
  381        optional_unix_domain_socket(Socket),
  382        catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true),
  383        tcp_bind(Socket, Unix_Domain_Socket_Path_And_File),
  384        Client_Address = Unix_Domain_Socket_Path_And_File
  385    ;   (   tcp_socket(Socket),
  386            tcp_setopt(Socket, reuseaddr),
  387            tcp_bind(Socket, '127.0.0.1':Port),
  388            debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]),
  389            Client_Address = Port
  390        )
  391    ),
  392    assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)).
  393
  394% Communicates the used port and password to the client via STDOUT so the client
  395% language library can use them to connect
  396send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :-
  397    (   Write_Connection_Values
  398    ->  (   (  var(Unix_Domain_Socket_Path_And_File)
  399            ->  format(Stream, "~d\n", [Port])
  400            ;   format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File])
  401            ),
  402            format(Stream, "~w\n", [Password]),
  403            flush_output(Stream)
  404        )
  405    ;   true
  406    ).
  407
  408
  409% Server thread worker predicate
  410% Listen for connections and create a connection for each in its own communication thread
  411% Uses tail recursion to ensure the stack doesn't grow
  412server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
  413    debug(mqi(protocol), "Listening on address: ~w", [Address]),
  414    tcp_listen(Socket, Connection_Count),
  415    tcp_open_socket(Socket, AcceptFd, _),
  416    create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure),
  417    server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure).
  418
  419
  420% Wait for the next connection and create communication and goal threads to support it
  421% Create known IDs for the threads so we can pass them along before the threads are created
  422% First create the goal thread to avoid a race condition where the communication
  423% thread tries to queue a goal before it is created
  424create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
  425    debug(mqi(protocol), "Waiting for client connection...", []),
  426    tcp_accept(AcceptFd, Socket, _Peer),
  427    debug(mqi(protocol), "Client connected", []),
  428    gensym('conn', Connection_Base),
  429    atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias),
  430    atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias),
  431    mutex_create(Goal_Alias, [alias(Goal_Alias)]),
  432    assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)),
  433    thread_create(goal_thread(Thread_Alias),
  434        _,
  435        [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]),
  436    thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure),
  437        _,
  438        [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]).
  439
  440
  441% The worker predicate for the Goal thread.
  442% Looks for a message from the connection thread, processes it, then recurses.
  443%
  444% Goals always run in the same thread in case the user is setting thread local information.
  445% For each answer to the user's query (including an exception), the goal thread will queue a message
  446% to the communication thread of the form result(Answer, Find_All), where Find_All == true if the user wants all answers at once
  447% Tail recurse to avoid growing the stack
  448goal_thread(Respond_To_Thread_ID) :-
  449    thread_self(Self_ID),
  450    throw_if_testing(Self_ID),
  451    thread_get_message(Self_ID, goal(Unexpanded_Goal, Binding_List, Query_Timeout, Find_All)),
  452    expand_goal(Unexpanded_Goal, Goal),
  453    debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, unexpanded: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Unexpanded_Goal, Goal]),
  454    (   Find_All
  455    ->  One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers)
  456    ;   One_Answer_Goal = ( findall(    One_Answer,
  457                                        (   @(user:Goal, user),
  458                                            One_Answer = [Binding_List],
  459                                            send_next_result(Respond_To_Thread_ID, One_Answer, _, Find_All)
  460                                        ),
  461                                        Answers
  462                            ),
  463                            (   Answers == []
  464                            ->  send_next_result(Respond_To_Thread_ID, [], _, Find_All)
  465                            ;   true
  466                            )
  467                          )
  468    ),
  469    Cancellable_Goal = run_cancellable_goal(Self_ID, One_Answer_Goal),
  470    (   Query_Timeout == -1
  471    ->  catch(Cancellable_Goal, Top_Exception, true)
  472    ;   catch(call_with_time_limit(Query_Timeout, Cancellable_Goal), Top_Exception, true)
  473    ),
  474    (   var(Top_Exception)
  475    ->  (   Find_All
  476        ->  send_next_result(Respond_To_Thread_ID, Answers, _, Find_All)
  477        ;   send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All)
  478        )
  479    ;   send_next_result(Respond_To_Thread_ID, [], Top_Exception, true)
  480    ),
  481    goal_thread(Respond_To_Thread_ID).
  482
  483
  484% Used only for testing unhandled exceptions outside of the "safe zone"
  485throw_if_testing(Self_ID) :-
  486    (   thread_peek_message(Self_ID, testThrow(Test_Exception))
  487    ->  (   debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]),
  488            throw(Test_Exception)
  489        )
  490    ;   true
  491    ).
  492
  493
  494% run_cancellable_goal handles the communication
  495% to ensure the cancel exception from the communication thread
  496% is injected at a place we are prepared to handle in the goal_thread
  497% Before the goal is run, sets a fact to indicate we are in the "safe to cancel"
  498% zone for the communication thread.
  499% Then it doesn't exit this "safe to cancel" zone if the
  500% communication thread is about to cancel
  501run_cancellable_goal(Mutex_ID, Goal) :-
  502    thread_self(Self_ID),
  503    setup_call_cleanup(
  504        assert(safe_to_cancel(Self_ID), Assertion),
  505        Goal,
  506        with_mutex(Mutex_ID, erase(Assertion))
  507    ).
  508
  509
  510% Worker predicate for the communication thread.
  511% Processes messages and sends goals to the goal thread.
  512% Continues processing messages until communication_thread_listen() throws or ends with true/false
  513%
  514% Catches all exceptions from communication_thread_listen so that it can do an orderly shutdown of the goal
  515%   thread if there is a communication failure.
  516%
  517% True means user explicitly called close or there was an exception
  518%   only exit the main thread if there was an exception and we are supposed to Exit_Main_On_Failure
  519%   otherwise just exit the session
  520communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :-
  521    thread_self(Self_ID),
  522    (   (
  523            catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout), error(Serve_Exception1, Serve_Exception2), true),
  524            debug(mqi(protocol), "Session finished. Communication thread exception: ~w", [error(Serve_Exception1, Serve_Exception2)]),
  525            abortSilentExit(Goal_Thread_ID, _),
  526            retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID))
  527        )
  528    ->  Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure)
  529    ;   Halt = true
  530    ),
  531    (   Halt
  532    ->  (   debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)", [Self_ID, error(Serve_Exception1, Serve_Exception2)]),
  533            quit(_)
  534        )
  535    ;   (   debug(mqi(protocol), "Ending session ~w", [Self_ID]),
  536            catch(tcp_close_socket(Socket), error(_, _), true)
  537        )
  538    ).
  539
  540
  541% Open socket and begin processing the streams for a connection using the Encoding if the password matches
  542% true: session ended
  543% exception: communication failure or an internal failure (like a thread threw or shutdown unexpectedly)
  544% false: halt
  545communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :-
  546    tcp_open_socket(Socket, Read_Stream, Write_Stream),
  547    thread_self(Communication_Thread_ID),
  548    assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)),
  549    set_stream(Read_Stream, encoding(Encoding)),
  550    set_stream(Write_Stream, encoding(Encoding)),
  551    read_message(Read_Stream, Sent_Password),
  552    (   Password == Sent_Password
  553    ->  (   debug(mqi(protocol), "Password matched.", []),
  554            thread_self(Self_ID),
  555            mqi_version(Major, Minor),
  556            reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID), version(Major, Minor)]]))
  557        )
  558    ;   (   debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]),
  559            reply_error(Write_Stream, password_mismatch),
  560            throw(password_mismatch)
  561        )
  562    ),
  563    process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout),
  564    debug(mqi(protocol), "Session finished.", []).
  565
  566
  567% process_mqi_messages implements the main interface to the Machine Query Interface.
  568% Continuously reads a Machine Query Interface message from Read_Stream and writes a response to Write_Stream,
  569% until the connection fails or a `quit` or `close` message is sent.
  570%
  571% Read_Stream and Write_Stream can be any valid stream using any encoding.
  572%
  573% Goal_Thread_ID must be the threadID of a thread started on the goal_thread predicate
  574%
  575% uses tail recursion to ensure the stack doesn't grow
  576%
  577% true: indicates we should terminate the session (clean termination)
  578% false: indicates we should exit the process if running in embedded mode
  579% exception: indicates we should terminate the session (communication failure termination) or
  580%    thread was asked to halt
  581process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :-
  582    process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command),
  583    (   Command == close
  584    ->  (   debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []),
  585            true
  586        )
  587    ;   (   Command == quit
  588        ->  (   debug(mqi(protocol), "Command: quit.", []),
  589                false
  590            )
  591        ;
  592            process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout)
  593        )
  594    ).
  595
  596% process_mqi_message manages the protocol for the connection: receive message, parse it, process it.
  597% - Reads a single message from Read_Stream.
  598% - Processes it and issues a response on Write_Stream.
  599% - The message will be unified with Command to allow the caller to handle it.
  600%
  601% Read_Stream and Write_Stream can be any valid stream using any encoding.
  602%
  603% True if the message understood. A response will always be sent.
  604% False if the message was malformed.
  605% Exceptions will be thrown by the underlying stream if there are communication failures writing to Write_Stream or the thread was asked to exit.
  606%
  607% state_* predicates manage the state transitions of the protocol
  608% They only bubble up exceptions if there is a communication failure
  609%
  610% state_process_command will never return false
  611% since errors should be sent to the client
  612% It can throw if there are communication failures, though.
  613process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :-
  614    debug(mqi(protocol), "Waiting for next message ...", []),
  615    (   state_receive_raw_message(Read_Stream, Message_String)
  616    ->  (   state_parse_command(Write_Stream, Message_String, Command, Binding_List)
  617        ->  state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List)
  618        ;   true
  619        )
  620    ;   false
  621    ).
  622
  623
  624% state_receive_raw_message: receive a raw message, which is simply a string
  625%   true: valid message received
  626%   false: invalid message format
  627%   exception: communication failure OR thread asked to exit
  628state_receive_raw_message(Read, Command_String) :-
  629    read_message(Read, Command_String),
  630    debug(mqi(protocol), "Valid message: ~w", [Command_String]).
  631
  632
  633% state_parse_command: attempt to parse the message string into a valid command
  634%
  635% Use read_term_from_atom instead of read_term(stream) so that we don't hang
  636% indefinitely if the caller didn't properly finish the term
  637% parse in the context of module 'user' to properly bind operators, do term expansion, etc
  638%
  639%   true: command could be parsed
  640%   false: command cannot be parsed.  An error is sent to the client in this case
  641%   exception: communication failure on sending a reply
  642state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :-
  643    (   catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true)
  644    ->  (   var(Parse_Exception)
  645        ->  debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command])
  646        ;   (   reply_error(Write_Stream, Parse_Exception),
  647                fail
  648            )
  649        )
  650    ;   (   reply_error(Write_Stream, error(couldNotParseCommand, _)),
  651            fail
  652        )
  653    ).
  654
  655
  656% state_process_command(): execute the requested Command
  657%
  658% First wait until we have removed all results from any previous query.
  659% If query_in_progress(Goal_Thread_ID) exists then there is at least one
  660% more result to drain, by definition. Because the predicate is
  661% deleted by get_next_result in the communication thread when the last result is drained
  662%
  663%   true: if the command itself succeeded, failed or threw an exception.
  664%         In that case, the outcome is sent to the client
  665%   exception: only communication or thread failures are allowed to bubble up
  666% See mqi(Options) documentation
  667state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :-
  668    !,
  669    debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]),
  670    repeat_until_false((
  671            query_in_progress(Goal_Thread_ID),
  672            debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
  673            heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
  674            debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
  675            debug(mqi(query), "    Discarded answer: ~w", [Unused_Answer])
  676        )),
  677    debug(mqi(protocol), "All previous results drained", []),
  678    send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true),
  679    heartbeat_until_result(Goal_Thread_ID, Stream, Answers),
  680    reply_with_result(Goal_Thread_ID, Stream, Answers).
  681
  682
  683% See mqi(Options) documentation for documentation
  684% See notes in run(Goal, Timeout) re: draining previous query
  685state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :-
  686    !,
  687    debug(mqi(protocol), "Command: run_async/1.", []),
  688    debug(mqi(query),  "   Goal: ~w", [Goal]),
  689    repeat_until_false((
  690            query_in_progress(Goal_Thread_ID),
  691            debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
  692            heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
  693            debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
  694            debug(mqi(query), "    Discarded answer: ~w", [Unused_Answer])
  695            )),
  696    debug(mqi(protocol), "All previous results drained", []),
  697    send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All),
  698    reply(Stream, true([[]])).
  699
  700
  701% See mqi(Options) documentation for documentation
  702state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :-
  703    !,
  704    debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]),
  705    (   once((var(Timeout) ; Timeout == -1))
  706    ->  Options = []
  707    ;   Options = [timeout(Timeout)]
  708    ),
  709    (   query_in_progress(Goal_Thread_ID)
  710    ->  (   (   debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
  711                get_next_result(Goal_Thread_ID, Stream, Options, Result)
  712            )
  713        ->  reply_with_result(Goal_Thread_ID, Stream, Result)
  714        ;   reply_error(Stream, result_not_available)
  715        )
  716   ;    (   debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
  717            reply_error(Stream, no_query)
  718        )
  719   ).
  720
  721
  722% See mqi(Options) documentation for documentation
  723% To ensure the goal thread is in a place it is safe to cancel,
  724% we lock a mutex first that the goal thread checks before exiting
  725% the "safe to cancel" zone.
  726% It is not in the safe zone: it either finished
  727% or was never running.
  728state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :-
  729    !,
  730    debug(mqi(protocol), "Command: cancel_async/0.", []),
  731    with_mutex(Goal_Thread_ID, (
  732        (   safe_to_cancel(Goal_Thread_ID)
  733        ->  (   thread_signal(Goal_Thread_ID, throw(cancel_goal)),
  734                reply(Stream, true([[]]))
  735            )
  736        ;   (   query_in_progress(Goal_Thread_ID)
  737            ->  (   debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
  738                    reply(Stream, true([[]]))
  739                )
  740            ;   (   debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
  741                    reply_error(Stream, no_query)
  742                )
  743            )
  744        )
  745    )).
  746
  747
  748% Used for testing how the system behaves when the goal thread is killed unexpectedly
  749% Needs to run a bogus command `run(true, -1)` to
  750% get the goal thread to process the exception
  751state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :-
  752    !,
  753    debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []),
  754    thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)),
  755    state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List).
  756
  757
  758state_process_command(Stream, _, _, close, _) :-
  759    !,
  760    reply(Stream, true([[]])).
  761
  762
  763state_process_command(Stream, _, _, quit, _) :-
  764    !,
  765    reply(Stream, true([[]])).
  766
  767
  768%  Send an exception if the command is not known
  769state_process_command(Stream, _, _, Command, _) :-
  770    debug(mqi(protocol), "Unknown command ~w", [Command]),
  771    reply_error(Stream, unknownCommand).
  772
  773
  774% Wait for a result (and put in Answers) from the goal thread, but send a heartbeat message
  775% every so often until it arrives to detect if the socket is broken.
  776% Throws if If the heartbeat failed which will
  777% and then shutdown the communication thread
  778% Tail recurse to not grow the stack
  779heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :-
  780    (   get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers)
  781    ->  debug(mqi(query), "Received answer from goal thread: ~w", [Answers])
  782    ;   (   debug(mqi(protocol), "heartbeat...", []),
  783            write_heartbeat(Stream),
  784            heartbeat_until_result(Goal_Thread_ID, Stream, Answers)
  785        )
  786    ).
  787
  788
  789% True if write succeeded, otherwise throws as that
  790% indicates that heartbeat failed because the other
  791% end of the pipe terminated
  792write_heartbeat(Stream) :-
  793    put_char(Stream, '.'),
  794    flush_output(Stream).
  795
  796
  797% Send a goal to the goal thread in its queue
  798%
  799% Remember that we are now running a query using assert.
  800%   This will be retracted once all the answers have been drained.
  801%
  802% If Goal_Thread_ID died, thread_send_message throws and, if we don't respond,
  803%   the client could hang so catch and give them a good message before propagating
  804%   the exception
  805send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :-
  806    (   var(Timeout)
  807    ->  Timeout = Default_Timeout
  808    ;   true
  809    ),
  810    (   var(Binding_List)
  811    ->  Binding_List = []
  812    ;   true
  813    ),
  814    debug(mqi(query),  "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]),
  815    assert(query_in_progress(Goal_Thread_ID)),
  816    catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true),
  817    (   var(Send_Message_Exception)
  818    ->  true
  819    ;   (   reply_error(Stream, connection_failed),
  820            throw(Send_Message_Exception)
  821        )
  822    ).
  823
  824
  825% Send a result from the goal thread to the communication thread in its queue
  826send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :-
  827    (   var(Exception_In_Goal)
  828    ->  (   (   debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]),
  829                Answer == []
  830            )
  831        ->  thread_send_message(Respond_To_Thread_ID, result(false, Find_All))
  832        ;   handle_constraints(Answer, Final_Answer),
  833            thread_send_message(Respond_To_Thread_ID, result(true(Final_Answer), Find_All))
  834        )
  835    ;   (   debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]),
  836            thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All))
  837        )
  838    ).
  839
  840
  841handle_constraints(Answer, Final_Answer) :-
  842    (   term_attvars(Answer, [])
  843    ->  Final_Answer = Answer
  844    ;   findall(    Single_Answer_With_Attributes,
  845                    (   member(Single_Answer, Answer),
  846                        copy_term(Single_Answer, Single_Answer_Copy, Attributes),
  847                        append(['$residuals' = Attributes], Single_Answer_Copy, Single_Answer_With_Attributes)
  848                    ),
  849                    Final_Answer
  850        ),
  851        debug(mqi(query), "Constraints detected, converted: ~w to ~w", [Answer, Final_Answer])
  852    ).
  853
  854
  855% Gets the next result from the goal thread in the communication thread queue,
  856% and retracts query_in_progress/1 when the last result has been sent.
  857% Find_All == true only returns one message, so delete query_in_progress
  858% no matter what it is
  859% \+ Find_All: There may be more than one result. The first one we hit with any exception
  860% (note that no_more_results is also returned as an exception) means we are done
  861get_next_result(Goal_Thread_ID, Stream, Options, Answers) :-
  862    (   thread_property(Goal_Thread_ID, status(running))
  863    ->  true
  864    ;   (   reply_error(Stream, connection_failed),
  865            throw(connection_failed)
  866        )
  867    ),
  868    thread_self(Self_ID),
  869    thread_get_message(Self_ID, result(Answers, Find_All), Options),
  870    (   Find_All
  871    ->  (   debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]),
  872            retractall(query_in_progress(Goal_Thread_ID))
  873        )
  874    ;   (   Answers = error(_)
  875        ->  (   debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]),
  876                retractall(query_in_progress(Goal_Thread_ID))
  877            )
  878        ;   true
  879        )
  880    ).
  881
  882
  883% reply_with_result predicates are used to consistently return
  884% answers for a query from either run() or run_async()
  885reply_with_result(_, Stream, error(Error)) :-
  886    !,
  887    reply_error(Stream, Error).
  888
  889% Gracefully handle exceptions that can occur during conversion to JSON
  890reply_with_result(_, Stream, Result) :-
  891    !,
  892    catch(reply(Stream, Result), error(Exception, _), reply_with_result(_, Stream, error(Exception))).
  893
  894
  895% Reply with a normal term
  896% Convert term to an actual JSON string
  897reply(Stream, Term) :-
  898    debug(mqi(query), "Responding with Term: ~w", [Term]),
  899    term_to_json_string(Term, Json_String),
  900    write_message(Stream, Json_String).
  901
  902
  903% Special handling for exceptions since they can have parts that are not
  904% "serializable". Ensures they they are always returned in an exception/1 term
  905reply_error(Stream, Error_Term) :-
  906    debug(mqi(query), "Responding with exception: ~w", [Error_Term]),
  907    (   error(Error_Value, _) = Error_Term
  908    ->  Response = exception(Error_Value)
  909    ;   (   atom(Error_Term)
  910        ->
  911            Response = exception(Error_Term)
  912        ;   (   compound_name_arity(Error_Term, Name, _),
  913                Response = exception(Name)
  914            )
  915        )
  916    ),
  917    reply(Stream, Response).
  918
  919
  920% Send and receive messages are simply strings preceded by their length + ".\n"
  921% i.e. "<stringlength>.\n<string>"
  922% The desired encoding must be set on the Stream before calling this predicate
  923
  924
  925% Writes the next message.
  926% Throws if there is an unexpected exception
  927write_message(Stream, String) :-
  928    write_string_length(Stream, String),
  929    write(Stream, String),
  930    flush_output(Stream).
  931
  932
  933% Reads the next message.
  934% Throws if there is an unexpected exception or thread has been requested to quit
  935% the length passed must match the actual number of bytes in the stream
  936% in whatever encoding is being used
  937read_message(Stream, String) :-
  938    read_string_length(Stream, Length),
  939    stream_property(Stream, encoding(Encoding)),
  940    setup_call_cleanup(
  941         stream_range_open(Stream, Tmp, [size(Length)]),
  942         ( set_stream(Tmp, encoding(Encoding)),
  943           read_string(Tmp, _, String)
  944         ),
  945         close(Tmp)).
  946
  947
  948% Terminate with '.\n' so we know that's the end of the count
  949write_string_length(Stream, String) :-
  950    stream_property(Stream, encoding(Encoding)),
  951    string_encoding_length(String, Encoding, Length),
  952    format(Stream, "~d.\n", [Length]).
  953
  954
  955% Note: read_term requires ".\n" after the length
  956% ... but does not consume the "\n"
  957read_string_length(Stream, Length) :-
  958    read_term(Stream, Length, []),
  959    get_char(Stream, _).
  960
  961
  962% converts a string to Codes using Encoding
  963string_encoding_length(String, Encoding, Length) :-
  964    setup_call_cleanup(
  965        open_null_stream(Out),
  966        (   set_stream(Out, encoding(Encoding)),
  967            write(Out, String),
  968            byte_count(Out, Length)
  969        ),
  970        close(Out)).
  971
  972
  973% Convert Prolog Term to a Prolog JSON term
  974% Add a final \n so that using netcat to debug works well
  975term_to_json_string(Term, Json_String) :-
  976    term_to_json(Term, Json),
  977    with_output_to(string(Json_String),
  978        (   current_output(Stream),
  979            json_write(Stream, Json),
  980            put(Stream, '\n')
  981        )).
  982
  983
  984% Execute the goal as once() without binding any variables
  985% and keep executing it until it returns false (or throws)
  986repeat_until_false(Goal) :-
  987    (\+ (\+ Goal)), !, repeat_until_false(Goal).
  988repeat_until_false(_).
  989
  990
  991% Used to kill a thread in an "expected" way so it doesn't leave around traces in thread_property/2 afterwards
  992%
  993% If the thread is alive OR it was already aborted (expected cases) then attempt to join
  994%   the thread so that no warnings are sent to the console. Other cases leave the thread for debugging.
  995% There are some fringe cases (like calling external code)
  996%   where the call might not return for a long time.  Do a timeout for those cases.
  997abortSilentExit(Thread_ID, Exception) :-
  998    catch(thread_signal(Thread_ID, abort), error(Exception, _), true),
  999    debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]).
 1000% Workaround SWI Prolog bug: https://github.com/SWI-Prolog/swipl-devel/issues/852 by not joining
 1001% The workaround just stops joining the aborted thread, so an inert record will be left if thread_property/2 is called.
 1002%    ,
 1003%    (   once((var(Exception) ; catch(thread_property(Thread_ID, status(exception('$aborted'))), error(_, _), true)))
 1004%    ->  (   catch(call_with_time_limit(4, thread_join(Thread_ID)), error(JoinException1, JoinException2), true),
 1005%            debug(mqi(protocol), "thread_join attempted because thread: ~w exit was expected, exception: ~w", [Thread_ID, error(JoinException1, JoinException2)])
 1006%        )
 1007%    ;   true
 1008%    ).
 1009
 1010
 1011% Detach a thread that exits with true or false so that it doesn't leave around a record in thread_property/2 afterwards
 1012% Don't detach a thread if it exits because of an exception so we can debug using thread_property/2 afterwards
 1013%
 1014% However, `abort` is an expected exception but detaching a thread that aborts will leave an unwanted
 1015% thread_property/2 record *and* print a message to the console. To work around this,
 1016% the goal thread is always aborted by the communication thread using abortSilentExit.
 1017detach_if_expected(Thread_ID) :-
 1018    thread_property(Thread_ID, status(Status)),
 1019    debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]),
 1020    (   once((Status = true ; Status = false))
 1021    ->  (   debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]),
 1022            thread_detach(Thread_ID)
 1023        )
 1024    ;   true
 1025    ).
 1026
 1027
 1028write_output_to_file(File) :-
 1029    debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]),
 1030    open(File, write, Stream, [buffer(false)]),
 1031    set_prolog_IO(user_input, Stream, Stream).
 1032
 1033
 1034% Creates a Unix Domain Socket file in a secured directory.
 1035% Throws if the directory or file cannot be created in /tmp for any reason
 1036% Requirements for this file are:
 1037%    - The Prolog process will attempt to create and, if Prolog exits cleanly,
 1038%           delete this file when the server closes.  This means the directory
 1039%           must have the appropriate permissions to allow the Prolog process
 1040%           to do so.
 1041%    - For security reasons, the filename should not be predictable and the
 1042%           directory it is contained in should have permissions set so that files
 1043%           created are only accessible to the current user.
 1044%    - The path must be below 92 *bytes* long (including null terminator) to
 1045%           be portable according to the Linux documentation
 1046%
 1047% tmp_file finds the right /tmp directory, even on Mac OS, so the path is small
 1048% Set 700 (rwx------)  permission so it is only accessible by current user
 1049% Create a secure tmp file in the new directory
 1050% {set,current}_prolog_flag is copied to a thread, so no need to use a mutex.
 1051% Close the stream so sockets can use it
 1052unix_domain_socket_path(Created_Directory, File_Path) :-
 1053    tmp_file(udsock, Created_Directory),
 1054    make_directory(Created_Directory),
 1055    catch(  chmod(Created_Directory, urwx),
 1056            Exception,
 1057            (   catch(delete_directory(Created_Directory), error(_, _), true),
 1058                throw(Exception)
 1059            )
 1060    ),
 1061    setup_call_cleanup( (   current_prolog_flag(tmp_dir, Save_Tmp_Dir),
 1062                            set_prolog_flag(tmp_dir, Created_Directory)
 1063                        ),
 1064                        tmp_file_stream(File_Path, Stream, []),
 1065                        set_prolog_flag(tmp_dir, Save_Tmp_Dir)
 1066                      ),
 1067    close(Stream)