View source with raw comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2019, Torbjörn Lager,
    8                              VU University Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(pengines,
   38          [ pengine_create/1,                   % +Options
   39            pengine_ask/3,                      % +Pengine, :Query, +Options
   40            pengine_next/2,                     % +Pengine. +Options
   41            pengine_stop/2,                     % +Pengine. +Options
   42            pengine_event/2,                    % -Event, +Options
   43            pengine_input/2,                    % +Prompt, -Term
   44            pengine_output/1,                   % +Term
   45            pengine_respond/3,                  % +Pengine, +Input, +Options
   46            pengine_debug/2,                    % +Format, +Args
   47            pengine_self/1,                     % -Pengine
   48            pengine_pull_response/2,            % +Pengine, +Options
   49            pengine_destroy/1,                  % +Pengine
   50            pengine_destroy/2,                  % +Pengine, +Options
   51            pengine_abort/1,                    % +Pengine
   52            pengine_application/1,              % +Application
   53            current_pengine_application/1,      % ?Application
   54            pengine_property/2,                 % ?Pengine, ?Property
   55            pengine_user/1,                     % -User
   56            pengine_event_loop/2,               % :Closure, +Options
   57            pengine_rpc/2,                      % +Server, :Goal
   58            pengine_rpc/3                       % +Server, :Goal, +Options
   59          ]).

Pengines: Web Logic Programming Made Easy

The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.

author
- Torbjörn Lager and Jan Wielemaker */
   70:- use_module(library(http/http_dispatch)).   71:- use_module(library(http/http_parameters)).   72:- use_module(library(http/http_client)).   73:- use_module(library(http/http_json)).   74:- use_module(library(http/http_open)).   75:- use_module(library(http/http_stream)).   76:- use_module(library(http/http_wrapper)).   77:- use_module(library(http/http_cors)).   78:- use_module(library(thread_pool)).   79:- use_module(library(broadcast)).   80:- use_module(library(uri)).   81:- use_module(library(filesex)).   82:- use_module(library(time)).   83:- use_module(library(lists)).   84:- use_module(library(charsio)).   85:- use_module(library(apply)).   86:- use_module(library(aggregate)).   87:- use_module(library(option)).   88:- use_module(library(settings)).   89:- use_module(library(debug)).   90:- use_module(library(error)).   91:- use_module(library(sandbox)).   92:- use_module(library(modules)).   93:- use_module(library(term_to_json)).   94:- if(exists_source(library(uuid))).   95:- use_module(library(uuid)).   96:- endif.   97
   98
   99:- meta_predicate
  100    pengine_create(:),
  101    pengine_rpc(+, +, :),
  102    pengine_event_loop(1, +).  103
  104:- multifile
  105    write_result/3,                 % +Format, +Event, +Dict
  106    event_to_json/3,                % +Event, -JSON, +Format
  107    prepare_module/3,               % +Module, +Application, +Options
  108    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  109    authentication_hook/3,          % +Request, +Application, -User
  110    not_sandboxed/2.                % +User, +App
  111
  112:- predicate_options(pengine_create/1, 1,
  113                     [ id(-atom),
  114                       alias(atom),
  115                       application(atom),
  116                       destroy(boolean),
  117                       server(atom),
  118                       ask(compound),
  119                       template(compound),
  120                       chunk(integer),
  121                       bindings(list),
  122                       src_list(list),
  123                       src_text(any),           % text
  124                       src_url(atom),
  125                       src_predicates(list)
  126                     ]).  127:- predicate_options(pengine_ask/3, 3,
  128                     [ template(any),
  129                       chunk(integer),
  130                       bindings(list)
  131                     ]).  132:- predicate_options(pengine_next/2, 2,
  133                     [ chunk(integer),
  134                       pass_to(pengine_send/3, 3)
  135                     ]).  136:- predicate_options(pengine_stop/2, 2,
  137                     [ pass_to(pengine_send/3, 3)
  138                     ]).  139:- predicate_options(pengine_respond/3, 2,
  140                     [ pass_to(pengine_send/3, 3)
  141                     ]).  142:- predicate_options(pengine_rpc/3, 3,
  143                     [ chunk(integer),
  144                       pass_to(pengine_create/1, 1)
  145                     ]).  146:- predicate_options(pengine_send/3, 3,
  147                     [ delay(number)
  148                     ]).  149:- predicate_options(pengine_event/2, 2,
  150                     [ pass_to(thread_get_message/3, 3)
  151                     ]).  152:- predicate_options(pengine_pull_response/2, 2,
  153                     [ pass_to(http_open/3, 3)
  154                     ]).  155:- predicate_options(pengine_event_loop/2, 2,
  156                     []).                       % not yet implemented
  157
  158% :- debug(pengine(transition)).
  159:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  160
  161goal_expansion(random_delay, Expanded) :-
  162    (   debugging(pengine(delay))
  163    ->  Expanded = do_random_delay
  164    ;   Expanded = true
  165    ).
  166
  167do_random_delay :-
  168    Delay is random(20)/1000,
  169    sleep(Delay).
  170
  171:- meta_predicate                       % internal meta predicates
  172    solve(+, ?, 0, +),
  173    findnsols_no_empty(+, ?, 0, -),
  174    pengine_event_loop(+, 1, +).
 pengine_create(:Options) is det
Creates a new pengine. Valid options are:
id(-ID)
ID gets instantiated to the id of the created pengine. ID is atomic.
alias(+Name)
The pengine is named Name (an atom). A slave pengine (child) can subsequently be referred to by this name.
application(+Application)
Application in which the pengine runs. See pengine_application/1.
server(+URL)
The pengine will run in (and in the Prolog context of) the pengine server located at URL.
src_list(+List_of_clauses)
Inject a list of Prolog clauses into the pengine.
src_text(+Atom_or_string)
Inject the clauses specified by a source text into the pengine.
src_url(+URL)
Inject the clauses specified in the file located at URL into the pengine.
src_predicates(+List)
Send the local predicates denoted by List to the remote pengine. List is a list of predicate indicators.

Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..

Successful creation of a pengine will return an event term of the following form:

create(ID, Term)
ID is the id of the pengine that was created. Term is not used at the moment.

An error will be returned if the pengine could not be created:

error(ID, Term)
ID is invalid, since no pengine was created. Term is the exception's error term. */
  229pengine_create(M:Options0) :-
  230    translate_local_sources(Options0, Options, M),
  231    (   select_option(server(BaseURL), Options, RestOptions)
  232    ->  remote_pengine_create(BaseURL, RestOptions)
  233    ;   local_pengine_create(Options)
  234    ).
 translate_local_sources(+OptionsIn, -Options, +Module) is det
Translate the src_predicates and src_list options into src_text. We need to do that anyway for remote pengines. For local pengines, we could avoid this step, but there is very little point in transferring source to a local pengine anyway as local pengines can access any Prolog predicate that you make visible to the application.

Multiple sources are concatenated to end up with a single src_text option.

  248translate_local_sources(OptionsIn, Options, Module) :-
  249    translate_local_sources(OptionsIn, Sources, Options2, Module),
  250    (   Sources == []
  251    ->  Options = Options2
  252    ;   Sources = [Source]
  253    ->  Options = [src_text(Source)|Options2]
  254    ;   atomics_to_string(Sources, Source)
  255    ->  Options = [src_text(Source)|Options2]
  256    ).
  257
  258translate_local_sources([], [], [], _).
  259translate_local_sources([H0|T], [S0|S], Options, M) :-
  260    nonvar(H0),
  261    translate_local_source(H0, S0, M),
  262    !,
  263    translate_local_sources(T, S, Options, M).
  264translate_local_sources([H|T0], S, [H|T], M) :-
  265    translate_local_sources(T0, S, T, M).
  266
  267translate_local_source(src_predicates(PIs), Source, M) :-
  268    must_be(list, PIs),
  269    with_output_to(string(Source),
  270                   maplist(list_in_module(M), PIs)).
  271translate_local_source(src_list(Terms), Source, _) :-
  272    must_be(list, Terms),
  273    with_output_to(string(Source),
  274                   forall(member(Term, Terms),
  275                          format('~k .~n', [Term]))).
  276translate_local_source(src_text(Source), Source, _).
  277
  278list_in_module(M, PI) :-
  279    listing(M:PI).
 pengine_send(+NameOrID, +Term) is det
Same as pengine_send(NameOrID, Term, []). */
  286pengine_send(Target, Event) :-
  287    pengine_send(Target, Event, []).
 pengine_send(+NameOrID, +Term, +Options) is det
Succeeds immediately and places Term in the queue of the pengine NameOrID. Options is a list of options:
delay(+Time)
The actual sending is delayed by Time seconds. Time is an integer or a float.

Any remaining options are passed to http_open/3. */

  302pengine_send(Target, Event, Options) :-
  303    must_be(atom, Target),
  304    pengine_send2(Target, Event, Options).
  305
  306pengine_send2(self, Event, Options) :-
  307    !,
  308    thread_self(Queue),
  309    delay_message(queue(Queue), Event, Options).
  310pengine_send2(Name, Event, Options) :-
  311    child(Name, Target),
  312    !,
  313    delay_message(pengine(Target), Event, Options).
  314pengine_send2(Target, Event, Options) :-
  315    delay_message(pengine(Target), Event, Options).
  316
  317delay_message(Target, Event, Options) :-
  318    option(delay(Delay), Options),
  319    !,
  320    alarm(Delay,
  321          send_message(Target, Event, Options),
  322          _AlarmID,
  323          [remove(true)]).
  324delay_message(Target, Event, Options) :-
  325    random_delay,
  326    send_message(Target, Event, Options).
  327
  328send_message(queue(Queue), Event, _) :-
  329    thread_send_message(Queue, pengine_request(Event)).
  330send_message(pengine(Pengine), Event, Options) :-
  331    (   pengine_remote(Pengine, Server)
  332    ->  remote_pengine_send(Server, Pengine, Event, Options)
  333    ;   pengine_thread(Pengine, Thread)
  334    ->  thread_send_message(Thread, pengine_request(Event))
  335    ;   existence_error(pengine, Pengine)
  336    ).
 pengine_request(-Request) is det
To be used by a pengine to wait for the next request. Such messages are placed in the queue by pengine_send/2.
  343pengine_request(Request) :-
  344    pengine_self(Self),
  345    get_pengine_application(Self, Application),
  346    setting(Application:idle_limit, IdleLimit),
  347    thread_self(Me),
  348    (   thread_get_message(Me, pengine_request(Request), [timeout(IdleLimit)])
  349    ->  true
  350    ;   Request = destroy
  351    ).
 pengine_reply(+Event) is det
 pengine_reply(+Queue, +Event) is det
Reply Event to the parent of the current Pengine or the given Queue. Such events are read by the other side with pengine_event/1.

If the message cannot be sent within the idle_limit setting of the pengine, abort the pengine.

  364pengine_reply(Event) :-
  365    pengine_parent(Queue),
  366    pengine_reply(Queue, Event).
  367
  368pengine_reply(_Queue, _Event0) :-
  369    nb_current(pengine_idle_limit_exceeded, true),
  370    !.
  371pengine_reply(Queue, Event0) :-
  372    arg(1, Event0, ID),
  373    wrap_first_answer(ID, Event0, Event),
  374    random_delay,
  375    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  376    (   pengine_self(ID),
  377        \+ pengine_detached(ID, _)
  378    ->  get_pengine_application(ID, Application),
  379        setting(Application:idle_limit, IdleLimit),
  380        debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]),
  381        (   thread_send_message(Queue, pengine_event(ID, Event),
  382                                [ timeout(IdleLimit)
  383                                ])
  384        ->  true
  385        ;   thread_self(Me),
  386            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  387                  [ID, Me]),
  388            nb_setval(pengine_idle_limit_exceeded, true),
  389            thread_detach(Me),
  390            abort
  391        )
  392    ;   thread_send_message(Queue, pengine_event(ID, Event))
  393    ).
  394
  395wrap_first_answer(ID, Event0, CreateEvent) :-
  396    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  397    arg(1, CreateEvent, ID),
  398    !,
  399    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  400wrap_first_answer(_ID, Event, Event).
  401
  402
  403empty_queue :-
  404    pengine_parent(Queue),
  405    empty_queue(Queue, 0, Discarded),
  406    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  407
  408empty_queue(Queue, C0, C) :-
  409    thread_get_message(Queue, _Term, [timeout(0)]),
  410    !,
  411    C1 is C0+1,
  412    empty_queue(Queue, C1, C).
  413empty_queue(_, C, C).
 pengine_ask(+NameOrID, @Query, +Options) is det
Asks pengine NameOrID a query Query.

Options is a list of options:

template(+Template)
Template is a variable (or a term containing variables) shared with the query. By default, the template is identical to the query.
chunk(+Integer)
Retrieve solutions in chunks of Integer rather than one by one. 1 means no chunking (default). Other integers indicate the maximum number of solutions to retrieve in one chunk.
bindings(+Bindings)
Sets the global variable '$variable_names' to a list of Name = Var terms, providing access to the actual variable names.

Any remaining options are passed to pengine_send/3.

Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.

success(ID, Terms, Projection, Time, More)
ID is the id of the pengine that succeeded in solving the query. Terms is a list holding instantiations of Template. Projection is a list of variable names that should be displayed. Time is the CPU time used to produce the results and finally, More is either true or false, indicating whether we can expect the pengine to be able to return more solutions or not, would we call pengine_next/2.
failure(ID)
ID is the id of the pengine that failed for lack of a solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, like so:

pengine_ask(ID, Query, Options) :-
    partition(pengine_ask_option, Options, AskOptions, SendOptions),
    pengine_send(ID, ask(Query, AskOptions), SendOptions).

*/

  478pengine_ask(ID, Query, Options) :-
  479    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  480    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  481
  482
  483pengine_ask_option(template(_)).
  484pengine_ask_option(chunk(_)).
  485pengine_ask_option(bindings(_)).
  486pengine_ask_option(breakpoints(_)).
 pengine_next(+NameOrID, +Options) is det
Asks pengine NameOrID for the next solution to a query started by pengine_ask/3. Defined options are:
chunk(+Count)
Modify the chunk-size to Count before asking the next set of solutions.

Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.

success(ID, Terms, Projection, Time, More)
See pengine_ask/3.
failure(ID)
ID is the id of the pengine that failed for lack of more solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, as follows:

pengine_next(ID, Options) :-
    pengine_send(ID, next, Options).

*/

  530pengine_next(ID, Options) :-
  531    select_option(chunk(Count), Options, Options1),
  532    !,
  533    pengine_send(ID, next(Count), Options1).
  534pengine_next(ID, Options) :-
  535    pengine_send(ID, next, Options).
 pengine_stop(+NameOrID, +Options) is det
Tells pengine NameOrID to stop looking for more solutions to a query started by pengine_ask/3. Options are passed to pengine_send/3.

Defined in terms of pengine_send/3, like so:

pengine_stop(ID, Options) :-
    pengine_send(ID, stop, Options).

*/

  551pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
 pengine_abort(+NameOrID) is det
Aborts the running query. The pengine goes back to state `2', waiting for new queries.
See also
- pengine_destroy/1. */
  562pengine_abort(Name) :-
  563    (   child(Name, Pengine)
  564    ->  true
  565    ;   Pengine = Name
  566    ),
  567    (   pengine_remote(Pengine, Server)
  568    ->  remote_pengine_abort(Server, Pengine, [])
  569    ;   pengine_thread(Pengine, Thread),
  570        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  571        catch(thread_signal(Thread, throw(abort_query)), _, true)
  572    ).
 pengine_destroy(+NameOrID) is det
 pengine_destroy(+NameOrID, +Options) is det
Destroys the pengine NameOrID. With the option force(true), the pengine is killed using abort/0 and pengine_destroy/2 succeeds. */
  582pengine_destroy(ID) :-
  583    pengine_destroy(ID, []).
  584
  585pengine_destroy(Name, Options) :-
  586    (   child(Name, ID)
  587    ->  true
  588    ;   ID = Name
  589    ),
  590    option(force(true), Options),
  591    !,
  592    (   pengine_thread(ID, Thread)
  593    ->  catch(thread_signal(Thread, abort),
  594              error(existence_error(thread, _), _), true)
  595    ;   true
  596    ).
  597pengine_destroy(ID, _) :-
  598    catch(pengine_send(ID, destroy),
  599          error(existence_error(pengine, ID), _),
  600          retractall(child(_,ID))).
  601
  602
  603/*================= pengines administration =======================
  604*/
 current_pengine(?Id, ?Parent, ?Location)
Dynamic predicate that registers our known pengines. Id is an atomic unique datatype. Parent is the id of our parent pengine. Location is one of
  615:- dynamic
  616    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  617    pengine_queue/4,                % Id, Queue, TimeOut, Time
  618    output_queue/3,                 % Id, Queue, Time
  619    pengine_user/2,                 % Id, User
  620    pengine_data/2,                 % Id, Data
  621    pengine_detached/2.             % Id, Data
  622:- volatile
  623    current_pengine/6,
  624    pengine_queue/4,
  625    output_queue/3,
  626    pengine_user/2,
  627    pengine_data/2,
  628    pengine_detached/2.  629
  630:- thread_local
  631    child/2.                        % ?Name, ?Child
 pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det
 pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det
 pengine_unregister(+Id) is det
  637pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  638    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  639
  640pengine_register_remote(Id, URL, Application, Destroy) :-
  641    thread_self(Queue),
  642    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
 pengine_unregister(+Id)
Called by the pengine thread destruction. If we are a remote pengine thread, our URL equals http and the queue is the message queue used to send events to the HTTP workers.
  650pengine_unregister(Id) :-
  651    thread_self(Me),
  652    (   current_pengine(Id, Queue, Me, http, _, _)
  653    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  654    ;   true
  655    ),
  656    retractall(current_pengine(Id, _, Me, _, _, _)),
  657    retractall(pengine_user(Id, _)),
  658    retractall(pengine_data(Id, _)).
  659
  660pengine_unregister_remote(Id) :-
  661    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
 pengine_self(-Id) is det
True if the current thread is a pengine with Id.
  667pengine_self(Id) :-
  668    thread_self(Thread),
  669    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  670
  671pengine_parent(Parent) :-
  672    nb_getval(pengine_parent, Parent).
  673
  674pengine_thread(Pengine, Thread) :-
  675    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  676    Thread \== 0,
  677    !.
  678
  679pengine_remote(Pengine, URL) :-
  680    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  681
  682get_pengine_application(Pengine, Application) :-
  683    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  684    !.
  685
  686get_pengine_module(Pengine, Pengine).
  687
  688:- if(current_predicate(uuid/2)).  689pengine_uuid(Id) :-
  690    uuid(Id, [version(4)]).             % Version 4 is random.
  691:- else.  692:- use_module(library(random)).  693pengine_uuid(Id) :-
  694    Max is 1<<128,
  695    random_between(0, Max, Num),
  696    atom_number(Id, Num).
  697:- endif.
 pengine_application(+Application) is det
Directive that must be used to declare a pengine application module. The module must not be associated to any file. The default application is pengine_sandbox. The example below creates a new application address_book and imports the API defined in the module file adress_book_api.pl into the application.
:- pengine_application(address_book).
:- use_module(address_book:adress_book_api).

*/

  713pengine_application(Application) :-
  714    throw(error(context_error(nodirective,
  715                             pengine_application(Application)), _)).
  716
  717:- multifile
  718    system:term_expansion/2,
  719    current_application/1.
 current_pengine_application(?Application) is nondet
True when Application is a currently defined application.
See also
- pengine_application/1
  727current_pengine_application(Application) :-
  728    current_application(Application).
  729
  730
  731% Default settings for all applications
  732
  733:- setting(thread_pool_size, integer, 100,
  734           'Maximum number of pengines this application can run.').  735:- setting(thread_pool_stacks, list(compound), [],
  736           'Maximum stack sizes for pengines this application can run.').  737:- setting(slave_limit, integer, 3,
  738           'Maximum number of slave pengines a master pengine can create.').  739:- setting(time_limit, number, 300,
  740           'Maximum time to wait for output').  741:- setting(idle_limit, number, 300,
  742           'Pengine auto-destroys when idle for this time').  743:- setting(safe_goal_limit, number, 10,
  744           'Maximum time to try proving safety of the goal').  745:- setting(program_space, integer, 100_000_000,
  746           'Maximum memory used by predicates').  747:- setting(allow_from, list(atom), [*],
  748           'IP addresses from which remotes are allowed to connect').  749:- setting(deny_from, list(atom), [],
  750           'IP addresses from which remotes are NOT allowed to connect').  751:- setting(debug_info, boolean, false,
  752           'Keep information to support source-level debugging').  753
  754
  755system:term_expansion((:- pengine_application(Application)), Expanded) :-
  756    must_be(atom, Application),
  757    (   module_property(Application, file(_))
  758    ->  permission_error(create, pengine_application, Application)
  759    ;   true
  760    ),
  761    expand_term((:- setting(Application:thread_pool_size, integer,
  762                            setting(pengines:thread_pool_size),
  763                            'Maximum number of pengines this \c
  764                            application can run.')),
  765                ThreadPoolSizeSetting),
  766    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  767                            setting(pengines:thread_pool_stacks),
  768                            'Maximum stack sizes for pengines \c
  769                            this application can run.')),
  770                ThreadPoolStacksSetting),
  771    expand_term((:- setting(Application:slave_limit, integer,
  772                            setting(pengines:slave_limit),
  773                            'Maximum number of local slave pengines \c
  774                            a master pengine can create.')),
  775                SlaveLimitSetting),
  776    expand_term((:- setting(Application:time_limit, number,
  777                            setting(pengines:time_limit),
  778                            'Maximum time to wait for output')),
  779                TimeLimitSetting),
  780    expand_term((:- setting(Application:idle_limit, number,
  781                            setting(pengines:idle_limit),
  782                            'Pengine auto-destroys when idle for this time')),
  783                IdleLimitSetting),
  784    expand_term((:- setting(Application:safe_goal_limit, number,
  785                            setting(pengines:safe_goal_limit),
  786                            'Maximum time to try proving safety of the goal')),
  787                SafeGoalLimitSetting),
  788    expand_term((:- setting(Application:program_space, integer,
  789                            setting(pengines:program_space),
  790                            'Maximum memory used by predicates')),
  791                ProgramSpaceSetting),
  792    expand_term((:- setting(Application:allow_from, list(atom),
  793                            setting(pengines:allow_from),
  794                            'IP addresses from which remotes are allowed \c
  795                            to connect')),
  796                AllowFromSetting),
  797    expand_term((:- setting(Application:deny_from, list(atom),
  798                            setting(pengines:deny_from),
  799                            'IP addresses from which remotes are NOT \c
  800                            allowed to connect')),
  801                DenyFromSetting),
  802    expand_term((:- setting(Application:debug_info, boolean,
  803                            setting(pengines:debug_info),
  804                            'Keep information to support source-level \c
  805                            debugging')),
  806                DebugInfoSetting),
  807    flatten([ pengines:current_application(Application),
  808              ThreadPoolSizeSetting,
  809              ThreadPoolStacksSetting,
  810              SlaveLimitSetting,
  811              TimeLimitSetting,
  812              IdleLimitSetting,
  813              SafeGoalLimitSetting,
  814              ProgramSpaceSetting,
  815              AllowFromSetting,
  816              DenyFromSetting,
  817              DebugInfoSetting
  818            ], Expanded).
  819
  820% Register default application
  821
  822:- pengine_application(pengine_sandbox).
 pengine_property(?Pengine, ?Property) is nondet
True when Property is a property of the given Pengine. Enumerates all pengines that are known to the calling Prolog process. Defined properties are:
self(ID)
Identifier of the pengine. This is the same as the first argument, and can be used to enumerate all known pengines.
alias(Name)
Name is the alias name of the pengine, as provided through the alias option when creating the pengine.
thread(Thread)
If the pengine is a local pengine, Thread is the Prolog thread identifier of the pengine.
remote(Server)
If the pengine is remote, the URL of the server.
application(Application)
Pengine runs the given application
module(Module)
Temporary module used for running the Pengine.
destroy(Destroy)
Destroy is true if the pengines is destroyed automatically after completing the query.
parent(Queue)
Message queue to which the (local) pengine reports.
source(?SourceID, ?Source)
Source is the source code with the given SourceID. May be present if the setting debug_info is present.
detached(?Time)
Pengine was detached at Time. */
  859pengine_property(Id, Prop) :-
  860    nonvar(Id), nonvar(Prop),
  861    pengine_property2(Id, Prop),
  862    !.
  863pengine_property(Id, Prop) :-
  864    pengine_property2(Prop, Id).
  865
  866pengine_property2(self(Id), Id) :-
  867    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  868pengine_property2(module(Id), Id) :-
  869    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  870pengine_property2(alias(Alias), Id) :-
  871    child(Alias, Id),
  872    Alias \== Id.
  873pengine_property2(thread(Thread), Id) :-
  874    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  875    Thread \== 0.
  876pengine_property2(remote(Server), Id) :-
  877    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  878pengine_property2(application(Application), Id) :-
  879    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  880pengine_property2(destroy(Destroy), Id) :-
  881    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  882pengine_property2(parent(Parent), Id) :-
  883    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  884pengine_property2(source(SourceID, Source), Id) :-
  885    pengine_data(Id, source(SourceID, Source)).
  886pengine_property2(detached(When), Id) :-
  887    pengine_detached(Id, When).
 pengine_output(+Term) is det
Sends Term to the parent pengine or thread. */
  894pengine_output(Term) :-
  895    pengine_self(Me),
  896    pengine_reply(output(Me, Term)).
 pengine_debug(+Format, +Args) is det
Create a message using format/3 from Format and Args and send this to the client. The default JavaScript client will call console.log(Message) if there is a console. The predicate pengine_rpc/3 calls debug(pengine(debug), '~w', [Message]). The debug topic pengine(debug) is enabled by default.
See also
- debug/1 and nodebug/1 for controlling the pengine(debug) topic
- format/2 for format specifications */
  911pengine_debug(Format, Args) :-
  912    pengine_parent(Queue),
  913    pengine_self(Self),
  914    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  915    (   var(E)
  916    ->  format(atom(Message), Format, Args)
  917    ;   message_to_string(E, Message)
  918    ),
  919    pengine_reply(Queue, debug(Self, Message)).
  920
  921
  922/*================= Local pengine =======================
  923*/
 local_pengine_create(+Options)
Creates a local Pengine, which is a thread running pengine_main/2. It maintains two predicates:
  934local_pengine_create(Options) :-
  935    thread_self(Self),
  936    option(application(Application), Options, pengine_sandbox),
  937    create(Self, Child, Options, local, Application),
  938    option(alias(Name), Options, Child),
  939    assert(child(Name, Child)).
 thread_pool:create_pool(+Application) is det
On demand creation of a thread pool for a pengine application.
  946thread_pool:create_pool(Application) :-
  947    current_application(Application),
  948    setting(Application:thread_pool_size, Size),
  949    setting(Application:thread_pool_stacks, Stacks),
  950    thread_pool_create(Application, Size, Stacks).
 create(+Queue, -Child, +Options, +URL, +Application) is det
Create a new pengine thread.
Arguments:
Queue- is the queue (or thread handle) to report to
Child- is the identifier of the created pengine.
URL- is one of local or http
  960create(Queue, Child, Options, local, Application) :-
  961    !,
  962    pengine_child_id(Child),
  963    create0(Queue, Child, Options, local, Application).
  964create(Queue, Child, Options, URL, Application) :-
  965    pengine_child_id(Child),
  966    catch(create0(Queue, Child, Options, URL, Application),
  967          Error,
  968          create_error(Queue, Child, Error)).
  969
  970pengine_child_id(Child) :-
  971    (   nonvar(Child)
  972    ->  true
  973    ;   pengine_uuid(Child)
  974    ).
  975
  976create_error(Queue, Child, Error) :-
  977    pengine_reply(Queue, error(Child, Error)).
  978
  979create0(Queue, Child, Options, URL, Application) :-
  980    (  current_application(Application)
  981    -> true
  982    ;  existence_error(pengine_application, Application)
  983    ),
  984    (   URL \== http                    % pengine is _not_ a child of the
  985                                        % HTTP server thread
  986    ->  aggregate_all(count, child(_,_), Count),
  987        setting(Application:slave_limit, Max),
  988        (   Count >= Max
  989        ->  throw(error(resource_error(max_pengines), _))
  990        ;   true
  991        )
  992    ;   true
  993    ),
  994    partition(pengine_create_option, Options, PengineOptions, RestOptions),
  995    thread_create_in_pool(
  996        Application,
  997        pengine_main(Queue, PengineOptions, Application), ChildThread,
  998        [ at_exit(pengine_done)
  999        | RestOptions
 1000        ]),
 1001    option(destroy(Destroy), PengineOptions, true),
 1002    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1003    thread_send_message(ChildThread, pengine_registered(Child)),
 1004    (   option(id(Id), Options)
 1005    ->  Id = Child
 1006    ;   true
 1007    ).
 1008
 1009pengine_create_option(src_text(_)).
 1010pengine_create_option(src_url(_)).
 1011pengine_create_option(application(_)).
 1012pengine_create_option(destroy(_)).
 1013pengine_create_option(ask(_)).
 1014pengine_create_option(template(_)).
 1015pengine_create_option(bindings(_)).
 1016pengine_create_option(chunk(_)).
 1017pengine_create_option(alias(_)).
 1018pengine_create_option(user(_)).
 pengine_done is det
Called from the pengine thread at_exit option. Destroys child pengines using pengine_destroy/1. Cleaning up the Pengine is synchronised by the pengine_done mutex. See read_event/6.
 1027:- public
 1028    pengine_done/0. 1029
 1030pengine_done :-
 1031    thread_self(Me),
 1032    (   thread_property(Me, status(exception('$aborted'))),
 1033        thread_detach(Me),
 1034        pengine_self(Pengine)
 1035    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1036              error(_,_), true)
 1037    ;   true
 1038    ),
 1039    forall(child(_Name, Child),
 1040           pengine_destroy(Child)),
 1041    pengine_self(Id),
 1042    with_mutex(pengine_done, pengine_unregister(Id)).
 pengine_main(+Parent, +Options, +Application)
Run a pengine main loop. First acknowledges its creation and run pengine_main_loop/1.
 1050:- thread_local wrap_first_answer_in_create_event/2. 1051
 1052:- meta_predicate
 1053    pengine_prepare_source(:, +). 1054
 1055pengine_main(Parent, Options, Application) :-
 1056    fix_streams,
 1057    thread_get_message(pengine_registered(Self)),
 1058    nb_setval(pengine_parent, Parent),
 1059    pengine_register_user(Options),
 1060    set_prolog_flag(mitigate_spectre, true),
 1061    catch(in_temporary_module(
 1062              Self,
 1063              pengine_prepare_source(Application, Options),
 1064              pengine_create_and_loop(Self, Application, Options)),
 1065          prepare_source_failed,
 1066          pengine_terminate(Self)).
 1067
 1068pengine_create_and_loop(Self, Application, Options) :-
 1069    setting(Application:slave_limit, SlaveLimit),
 1070    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1071    (   option(ask(Query0), Options)
 1072    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1073        (   string(Query0)                      % string is not callable
 1074        ->  (   option(template(TemplateS), Options)
 1075            ->  Ask2 = Query0-TemplateS
 1076            ;   Ask2 = Query0
 1077            ),
 1078            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1079                  Error, true),
 1080            (   var(Error)
 1081            ->  true
 1082            ;   send_error(Error),
 1083                throw(prepare_source_failed)
 1084            )
 1085        ;   Query = Query0,
 1086            option(template(Template), Options, Query),
 1087            option(bindings(Bindings), Options, [])
 1088        ),
 1089        option(chunk(Chunk), Options, 1),
 1090        pengine_ask(Self, Query,
 1091                    [ template(Template),
 1092                      chunk(Chunk),
 1093                      bindings(Bindings)
 1094                    ])
 1095    ;   Extra = [],
 1096        pengine_reply(CreateEvent)
 1097    ),
 1098    pengine_main_loop(Self).
 ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det
Translate the AskSpec into a query, template and bindings. The trick is that we must parse using the operator declarations of the source and we must make sure variable sharing between query and answer template are known.
 1108ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1109    !,
 1110    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1111    term_string(t(Template1,Ask1), AskTemplate,
 1112                [ variable_names(Bindings0),
 1113                  module(Module)
 1114                ]),
 1115    phrase(template_bindings(Template1, Bindings0), Bindings).
 1116ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1117    term_string(Ask1, Ask,
 1118                [ variable_names(Bindings),
 1119                  module(Module)
 1120                ]),
 1121    exclude(anon, Bindings, Bindings1),
 1122    dict_create(Template, swish_default_template, Bindings1).
 1123
 1124template_bindings(Var, Bindings) -->
 1125    { var(Var) }, !,
 1126    (   { var_binding(Bindings, Var, Binding)
 1127        }
 1128    ->  [Binding]
 1129    ;   []
 1130    ).
 1131template_bindings([H|T], Bindings) -->
 1132    !,
 1133    template_bindings(H, Bindings),
 1134    template_bindings(T, Bindings).
 1135template_bindings(Compoound, Bindings) -->
 1136    { compound(Compoound), !,
 1137      compound_name_arguments(Compoound, _, Args)
 1138    },
 1139    template_bindings(Args, Bindings).
 1140template_bindings(_, _) --> [].
 1141
 1142var_binding(Bindings, Var, Binding) :-
 1143    member(Binding, Bindings),
 1144    arg(2, Binding, V),
 1145    V == Var, !.
 fix_streams is det
If we are a pengine that is created from a web server thread, the current output points to a CGI stream.
 1152fix_streams :-
 1153    fix_stream(current_output).
 1154
 1155fix_stream(Name) :-
 1156    is_cgi_stream(Name),
 1157    !,
 1158    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1159    set_stream(user_output, alias(Name)).
 1160fix_stream(_).
 pengine_prepare_source(:Application, +Options) is det
Load the source into the pengine's module.
throws
- prepare_source_failed if it failed to prepare the sources.
 1169pengine_prepare_source(Module:Application, Options) :-
 1170    setting(Application:program_space, SpaceLimit),
 1171    set_module(Module:program_space(SpaceLimit)),
 1172    delete_import_module(Module, user),
 1173    add_import_module(Module, Application, start),
 1174    catch(prep_module(Module, Application, Options), Error, true),
 1175    (   var(Error)
 1176    ->  true
 1177    ;   send_error(Error),
 1178        throw(prepare_source_failed)
 1179    ).
 1180
 1181prep_module(Module, Application, Options) :-
 1182    maplist(copy_flag(Module, Application), [var_prefix]),
 1183    forall(prepare_module(Module, Application, Options), true),
 1184    setup_call_cleanup(
 1185        '$set_source_module'(OldModule, Module),
 1186        maplist(process_create_option(Module), Options),
 1187        '$set_source_module'(OldModule)).
 1188
 1189copy_flag(Module, Application, Flag) :-
 1190    current_prolog_flag(Application:Flag, Value),
 1191    !,
 1192    set_prolog_flag(Module:Flag, Value).
 1193copy_flag(_, _, _).
 1194
 1195process_create_option(Application, src_text(Text)) :-
 1196    !,
 1197    pengine_src_text(Text, Application).
 1198process_create_option(Application, src_url(URL)) :-
 1199    !,
 1200    pengine_src_url(URL, Application).
 1201process_create_option(_, _).
 prepare_module(+Module, +Application, +Options) is semidet
Hook, called to initialize the temporary private module that provides the working context of a pengine. This hook is executed by the pengine's thread. Preparing the source consists of three steps:
  1. Add Application as (first) default import module for Module
  2. Call this hook
  3. Compile the source provided by the the src_text and src_url options
Arguments:
Module- is a new temporary module (see in_temporary_module/3) that may be (further) prepared by this hook.
Application- (also a module) associated to the pengine.
Options- is passed from the environment and should (currently) be ignored.
 1224pengine_main_loop(ID) :-
 1225    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1226
 1227pengine_aborted(ID) :-
 1228    thread_self(Self),
 1229    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1230    empty_queue,
 1231    destroy_or_continue(abort(ID)).
 guarded_main_loop(+Pengine) is det
Executes state `2' of the pengine, where it waits for two events:
destroy
Terminate the pengine
ask(:Goal, +Options)
Solve Goal.
 1244guarded_main_loop(ID) :-
 1245    pengine_request(Request),
 1246    (   Request = destroy
 1247    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1248        pengine_terminate(ID)
 1249    ;   Request = ask(Goal, Options)
 1250    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1251        ask(ID, Goal, Options)
 1252    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1253        pengine_reply(error(ID, error(protocol_error, _))),
 1254        guarded_main_loop(ID)
 1255    ).
 1256
 1257
 1258pengine_terminate(ID) :-
 1259    pengine_reply(destroy(ID)),
 1260    thread_self(Me),            % Make the thread silently disappear
 1261    thread_detach(Me).
 solve(+Chunk, +Template, :Goal, +ID) is det
Solve Goal. Note that because we can ask for a new goal in state `6', we must provide for an ancesteral cut (prolog_cut_to/1). We need to be sure to have a choice point before we can call prolog_current_choice/1. This is the reason why this predicate has two clauses.
 1272solve(Chunk, Template, Goal, ID) :-
 1273    prolog_current_choice(Choice),
 1274    State = count(Chunk),
 1275    statistics(cputime, Epoch),
 1276    Time = time(Epoch),
 1277    nb_current('$variable_names', Bindings),
 1278    filter_template(Template, Bindings, Template2),
 1279    '$current_typein_module'(CurrTypeIn),
 1280    (   '$set_typein_module'(ID),
 1281        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1282                                              set_projection(Goal, Bindings),
 1283                                              Result),
 1284                           Error, true),
 1285                     query_done(Det, CurrTypeIn)),
 1286        arg(1, Time, T0),
 1287        statistics(cputime, T1),
 1288        CPUTime is T1-T0,
 1289        (   var(Error)
 1290        ->  projection(Projection),
 1291            (   var(Det)
 1292            ->  pengine_reply(success(ID, Result, Projection,
 1293                                      CPUTime, true)),
 1294                more_solutions(ID, Choice, State, Time)
 1295            ;   !,                      % commit
 1296                destroy_or_continue(success(ID, Result, Projection,
 1297                                            CPUTime, false))
 1298            )
 1299        ;   !,                          % commit
 1300            (   Error == abort_query
 1301            ->  throw(Error)
 1302            ;   destroy_or_continue(error(ID, Error))
 1303            )
 1304        )
 1305    ;   !,                              % commit
 1306        arg(1, Time, T0),
 1307        statistics(cputime, T1),
 1308        CPUTime is T1-T0,
 1309        destroy_or_continue(failure(ID, CPUTime))
 1310    ).
 1311solve(_, _, _, _).                      % leave a choice point
 1312
 1313query_done(true, CurrTypeIn) :-
 1314    '$set_typein_module'(CurrTypeIn).
 set_projection(:Goal, +Bindings)
findnsols/4 copies its goal and template to avoid instantiation thereof when it stops after finding N solutions. Using this helper we can a renamed version of Bindings that we can set.
 1323set_projection(Goal, Bindings) :-
 1324    b_setval('$variable_names', Bindings),
 1325    call(Goal).
 1326
 1327projection(Projection) :-
 1328    nb_current('$variable_names', Bindings),
 1329    !,
 1330    maplist(var_name, Bindings, Projection).
 1331projection([]).
 filter_template(+Template0, +Bindings, -Template) is det
Establish the final template. This is there because hooks such as goal_expansion/2 and the SWISH query hooks can modify the set of bindings.
bug
- Projection and template handling is pretty messy.
 1341filter_template(Template0, Bindings, Template) :-
 1342    is_dict(Template0, swish_default_template),
 1343    !,
 1344    dict_create(Template, swish_default_template, Bindings).
 1345filter_template(Template, _Bindings, Template).
 1346
 1347findnsols_no_empty(N, Template, Goal, List) :-
 1348    findnsols(N, Template, Goal, List),
 1349    List \== [].
 1350
 1351destroy_or_continue(Event) :-
 1352    arg(1, Event, ID),
 1353    (   pengine_property(ID, destroy(true))
 1354    ->  thread_self(Me),
 1355        thread_detach(Me),
 1356        pengine_reply(destroy(ID, Event))
 1357    ;   pengine_reply(Event),
 1358        garbage_collect,                % minimise our footprint
 1359        trim_stacks,
 1360        guarded_main_loop(ID)
 1361    ).
 more_solutions(+Pengine, +Choice, +State, +Time)
Called after a solution was found while there can be more. This is state `6' of the state machine. It processes these events:
stop
Go back via state `7' to state `2' (guarded_main_loop/1)
next
Fail. This causes solve/3 to backtrack on the goal asked, providing at most the current chunk solutions.
next(Count)
As next, but sets the new chunk-size to Count.
ask(Goal, Options)
Ask another goal. Note that we must commit the choice point of the previous goal asked for.
 1379more_solutions(ID, Choice, State, Time) :-
 1380    pengine_request(Event),
 1381    more_solutions(Event, ID, Choice, State, Time).
 1382
 1383more_solutions(stop, ID, _Choice, _State, _Time) :-
 1384    !,
 1385    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1386    destroy_or_continue(stop(ID)).
 1387more_solutions(next, ID, _Choice, _State, Time) :-
 1388    !,
 1389    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1390    statistics(cputime, T0),
 1391    nb_setarg(1, Time, T0),
 1392    fail.
 1393more_solutions(next(Count), ID, _Choice, State, Time) :-
 1394    Count > 0,
 1395    !,
 1396    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1397    nb_setarg(1, State, Count),
 1398    statistics(cputime, T0),
 1399    nb_setarg(1, Time, T0),
 1400    fail.
 1401more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1402    !,
 1403    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1404    prolog_cut_to(Choice),
 1405    ask(ID, Goal, Options).
 1406more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1407    !,
 1408    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1409    pengine_terminate(ID).
 1410more_solutions(Event, ID, Choice, State, Time) :-
 1411    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1412    pengine_reply(error(ID, error(protocol_error, _))),
 1413    more_solutions(ID, Choice, State, Time).
 ask(+Pengine, :Goal, +Options)
Migrate from state `2' to `3'. This predicate validates that it is safe to call Goal using safe_goal/1 and then calls solve/3 to prove the goal. It takes care of the chunk(N) option.
 1421ask(ID, Goal, Options) :-
 1422    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1423    !,
 1424    (   var(Error)
 1425    ->  option(template(Template), Options, Goal),
 1426        option(chunk(N), Options, 1),
 1427        solve(N, Template, Goal1, ID)
 1428    ;   pengine_reply(error(ID, Error)),
 1429        guarded_main_loop(ID)
 1430    ).
 prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det
Prepare GoalIn for execution in Pengine. This implies we must perform goal expansion and, if the system is sandboxed, check the sandbox.

Note that expand_goal(Module:GoalIn, GoalOut) is what we'd like to write, but this does not work correctly if the user wishes to expand X:Y while interpreting X not as the module in which to run Y. This happens in the CQL package. Possibly we should disallow this reinterpretation?

 1444prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1445    option(bindings(Bindings), Options, []),
 1446    b_setval('$variable_names', Bindings),
 1447    (   prepare_goal(Goal0, Goal1, Options)
 1448    ->  true
 1449    ;   Goal1 = Goal0
 1450    ),
 1451    get_pengine_module(ID, Module),
 1452    setup_call_cleanup(
 1453        '$set_source_module'(Old, Module),
 1454        expand_goal(Goal1, Goal),
 1455        '$set_source_module'(_, Old)),
 1456    (   pengine_not_sandboxed(ID)
 1457    ->  true
 1458    ;   get_pengine_application(ID, App),
 1459        setting(App:safe_goal_limit, Limit),
 1460        catch(call_with_time_limit(
 1461                  Limit,
 1462                  safe_goal(Module:Goal)), E, true)
 1463    ->  (   var(E)
 1464        ->  true
 1465        ;   E = time_limit_exceeded
 1466        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1467        ;   throw(E)
 1468        )
 1469    ).
 prepare_goal(+Goal0, -Goal1, +Options) is semidet
Pre-preparation hook for running Goal0. The hook runs in the context of the pengine. Goal is the raw goal given to ask. The returned Goal1 is subject to goal expansion (expand_goal/2) and sandbox validation (safe_goal/1) prior to execution. If this goal fails, Goal0 is used for further processing.
Arguments:
Options- provides the options as given to ask
 pengine_not_sandboxed(+Pengine) is semidet
True when pengine does not operate in sandboxed mode. This implies a user must be registered by authentication_hook/3 and the hook pengines:not_sandboxed(User, Application) must succeed.
 1489pengine_not_sandboxed(ID) :-
 1490    pengine_user(ID, User),
 1491    pengine_property(ID, application(App)),
 1492    not_sandboxed(User, App),
 1493    !.
 not_sandboxed(+User, +Application) is semidet
This hook is called to see whether the Pengine must be executed in a protected environment. It is only called after authentication_hook/3 has confirmed the authentity of the current user. If this hook succeeds, both loading the code and executing the query is executed without enforcing sandbox security. Typically, one should:
  1. Provide a safe user authentication hook.
  2. Enable HTTPS in the server or put it behind an HTTPS proxy and ensure that the network between the proxy and the pengine server can be trusted.
 pengine_pull_response(+Pengine, +Options) is det
Pulls a response (an event term) from the slave Pengine if Pengine is a remote process, else does nothing at all. */
 1515pengine_pull_response(Pengine, Options) :-
 1516    pengine_remote(Pengine, Server),
 1517    !,
 1518    remote_pengine_pull_response(Server, Pengine, Options).
 1519pengine_pull_response(_ID, _Options).
 pengine_input(+Prompt, -Term) is det
Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be any term, compound as well as atomic. */
 1528pengine_input(Prompt, Term) :-
 1529    pengine_self(Self),
 1530    pengine_parent(Parent),
 1531    pengine_reply(Parent, prompt(Self, Prompt)),
 1532    pengine_request(Request),
 1533    (   Request = input(Input)
 1534    ->  Term = Input
 1535    ;   Request == destroy
 1536    ->  abort
 1537    ;   throw(error(protocol_error,_))
 1538    ).
 pengine_respond(+Pengine, +Input, +Options) is det
Sends a response in the form of the term Input to a slave (child) pengine that has prompted its master (parent) for input.

Defined in terms of pengine_send/3, as follows:

pengine_respond(Pengine, Input, Options) :-
    pengine_send(Pengine, input(Input), Options).

*/

 1555pengine_respond(Pengine, Input, Options) :-
 1556    pengine_send(Pengine, input(Input), Options).
 send_error(+Error) is det
Send an error to my parent. Remove non-readable blobs from the error term first using replace_blobs/2. If the error contains a stack-trace, this is resolved to a string before sending.
 1565send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1566    is_list(Frames),
 1567    !,
 1568    with_output_to(string(Stack),
 1569                   print_prolog_backtrace(current_output, Frames)),
 1570    pengine_self(Self),
 1571    replace_blobs(Formal, Formal1),
 1572    replace_blobs(Message, Message1),
 1573    pengine_reply(error(Self, error(Formal1,
 1574                                    context(prolog_stack(Stack), Message1)))).
 1575send_error(Error) :-
 1576    pengine_self(Self),
 1577    replace_blobs(Error, Error1),
 1578    pengine_reply(error(Self, Error1)).
 replace_blobs(Term0, Term) is det
Copy Term0 to Term, replacing non-text blobs. This is required for error messages that may hold streams and other handles to non-readable objects.
 1586replace_blobs(Blob, Atom) :-
 1587    blob(Blob, Type), Type \== text,
 1588    !,
 1589    format(atom(Atom), '~p', [Blob]).
 1590replace_blobs(Term0, Term) :-
 1591    compound(Term0),
 1592    !,
 1593    compound_name_arguments(Term0, Name, Args0),
 1594    maplist(replace_blobs, Args0, Args),
 1595    compound_name_arguments(Term, Name, Args).
 1596replace_blobs(Term, Term).
 1597
 1598
 1599/*================= Remote pengines =======================
 1600*/
 1601
 1602
 1603remote_pengine_create(BaseURL, Options) :-
 1604    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1605        (       option(ask(Query), PengineOptions0),
 1606                \+ option(template(_Template), PengineOptions0)
 1607        ->      PengineOptions = [template(Query)|PengineOptions0]
 1608        ;       PengineOptions = PengineOptions0
 1609        ),
 1610    options_to_dict(PengineOptions, PostData),
 1611    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1612    arg(1, Reply, ID),
 1613    (   option(id(ID2), Options)
 1614    ->  ID = ID2
 1615    ;   true
 1616    ),
 1617    option(alias(Name), Options, ID),
 1618    assert(child(Name, ID)),
 1619    (   (   functor(Reply, create, _)   % actually created
 1620        ;   functor(Reply, output, _)   % compiler messages
 1621        )
 1622    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1623        option(destroy(Destroy), PengineOptions, true),
 1624        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1625    ;   true
 1626    ),
 1627    thread_self(Queue),
 1628    pengine_reply(Queue, Reply).
 1629
 1630options_to_dict(Options, Dict) :-
 1631    select_option(ask(Ask), Options, Options1),
 1632    select_option(template(Template), Options1, Options2),
 1633    !,
 1634    no_numbered_var_in(Ask+Template),
 1635    findall(AskString-TemplateString,
 1636            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1637            [ AskString-TemplateString ]),
 1638    options_to_dict(Options2, Dict0),
 1639    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1640options_to_dict(Options, Dict) :-
 1641    maplist(prolog_option, Options, Options1),
 1642    dict_create(Dict, _, Options1).
 1643
 1644no_numbered_var_in(Term) :-
 1645    sub_term(Sub, Term),
 1646    subsumes_term('$VAR'(_), Sub),
 1647    !,
 1648    domain_error(numbered_vars_free_term, Term).
 1649no_numbered_var_in(_).
 1650
 1651ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1652    numbervars(Ask+Template, 0, _),
 1653    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1654    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1655                                            Template, WOpts
 1656                                          ]),
 1657    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1658
 1659prolog_option(Option0, Option) :-
 1660    create_option_type(Option0, term),
 1661    !,
 1662    Option0 =.. [Name,Value],
 1663    format(string(String), '~k', [Value]),
 1664    Option =.. [Name,String].
 1665prolog_option(Option, Option).
 1666
 1667create_option_type(ask(_),         term).
 1668create_option_type(template(_),    term).
 1669create_option_type(application(_), atom).
 1670
 1671remote_pengine_send(BaseURL, ID, Event, Options) :-
 1672    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1673    thread_self(Queue),
 1674    pengine_reply(Queue, Reply).
 1675
 1676remote_pengine_pull_response(BaseURL, ID, Options) :-
 1677    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1678    thread_self(Queue),
 1679    pengine_reply(Queue, Reply).
 1680
 1681remote_pengine_abort(BaseURL, ID, Options) :-
 1682    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1683    thread_self(Queue),
 1684    pengine_reply(Queue, Reply).
 remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
Issue a GET request on Server and unify Reply with the replied term.
 1691remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1692    !,
 1693    server_url(Server, Action, [id=ID], URL),
 1694    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1695              [ post(prolog(Event))     % makes it impossible to interrupt.
 1696              | Options
 1697              ]),
 1698    call_cleanup(
 1699        read_prolog_reply(Stream, Reply),
 1700        close(Stream)).
 1701remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1702    server_url(Server, Action, [id=ID|Params], URL),
 1703    http_open(URL, Stream, Options),
 1704    call_cleanup(
 1705        read_prolog_reply(Stream, Reply),
 1706        close(Stream)).
 1707
 1708remote_post_rec(Server, Action, Data, Reply, Options) :-
 1709    server_url(Server, Action, [], URL),
 1710    probe(Action, URL),
 1711    http_open(URL, Stream,
 1712              [ post(json(Data))
 1713              | Options
 1714              ]),
 1715    call_cleanup(
 1716        read_prolog_reply(Stream, Reply),
 1717        close(Stream)).
 probe(+Action, +URL) is det
Probe the target. This is a good idea before posting a large document and be faced with an authentication challenge. Possibly we should make this an option for simpler scenarios.
 1725probe(create, URL) :-
 1726    !,
 1727    http_open(URL, Stream, [method(options)]),
 1728    close(Stream).
 1729probe(_, _).
 1730
 1731read_prolog_reply(In, Reply) :-
 1732    set_stream(In, encoding(utf8)),
 1733    read(In, Reply0),
 1734    rebind_cycles(Reply0, Reply).
 1735
 1736rebind_cycles(@(Reply, Bindings), Reply) :-
 1737    is_list(Bindings),
 1738    !,
 1739    maplist(bind, Bindings).
 1740rebind_cycles(Reply, Reply).
 1741
 1742bind(Var = Value) :-
 1743    Var = Value.
 1744
 1745server_url(Server, Action, Params, URL) :-
 1746    uri_components(Server, Components0),
 1747    uri_query_components(Query, Params),
 1748    uri_data(path, Components0, Path0),
 1749    atom_concat('pengine/', Action, PAction),
 1750    directory_file_path(Path0, PAction, Path),
 1751    uri_data(path, Components0, Path, Components),
 1752    uri_data(search, Components, Query),
 1753    uri_components(URL, Components).
 pengine_event(?EventTerm) is det
 pengine_event(?EventTerm, +Options) is det
Examines the pengine's event queue and if necessary blocks execution until a term that unifies to Term arrives in the queue. After a term from the queue has been unified to Term, the term is deleted from the queue.

Valid options are:

timeout(+Time)
Time is a float or integer and specifies the maximum time to wait in seconds. If no event has arrived before the time is up EventTerm is bound to the atom timeout.
listen(+Id)
Only listen to events from the pengine identified by Id. */
 1774pengine_event(Event) :-
 1775    pengine_event(Event, []).
 1776
 1777pengine_event(Event, Options) :-
 1778    thread_self(Self),
 1779    option(listen(Id), Options, _),
 1780    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1781    ->  true
 1782    ;   Event = timeout
 1783    ),
 1784    update_remote_destroy(Event).
 1785
 1786update_remote_destroy(Event) :-
 1787    destroy_event(Event),
 1788    arg(1, Event, Id),
 1789    pengine_remote(Id, _Server),
 1790    !,
 1791    pengine_unregister_remote(Id).
 1792update_remote_destroy(_).
 1793
 1794destroy_event(destroy(_)).
 1795destroy_event(destroy(_,_)).
 1796destroy_event(create(_,Features)) :-
 1797    memberchk(answer(Answer), Features),
 1798    !,
 1799    nonvar(Answer),
 1800    destroy_event(Answer).
 pengine_event_loop(:Closure, +Options) is det
Starts an event loop accepting event terms sent to the current pengine or thread. For each such event E, calls ignore(call(Closure, E)). A closure thus acts as a handler for the event. Some events are also treated specially:
create(ID, Term)
The ID is placed in a list of active pengines.
destroy(ID)
The ID is removed from the list of active pengines. When the last pengine ID is removed, the loop terminates.
output(ID, Term)
The predicate pengine_pull_response/2 is called.

Valid options are:

autoforward(+To)
Forwards received event terms to slaves. To is either all, all_but_sender or a Prolog list of NameOrIDs. [not yet implemented]

*/

 1829pengine_event_loop(Closure, Options) :-
 1830    child(_,_),
 1831    !,
 1832    pengine_event(Event),
 1833    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1834    ->  forall(child(_,ID), pengine_send(ID, Event))
 1835    ;   true
 1836    ),
 1837    pengine_event_loop(Event, Closure, Options).
 1838pengine_event_loop(_, _).
 1839
 1840:- meta_predicate
 1841    pengine_process_event(+, 1, -, +). 1842
 1843pengine_event_loop(Event, Closure, Options) :-
 1844    pengine_process_event(Event, Closure, Continue, Options),
 1845    (   Continue == true
 1846    ->  pengine_event_loop(Closure, Options)
 1847    ;   true
 1848    ).
 1849
 1850pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1851    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1852    (   select(answer(First), T, T1)
 1853    ->  ignore(call(Closure, create(ID, T1))),
 1854        pengine_process_event(First, Closure, Continue, Options)
 1855    ;   ignore(call(Closure, create(ID, T))),
 1856        Continue = true
 1857    ).
 1858pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1859    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1860    ignore(call(Closure, output(ID, Msg))),
 1861    pengine_pull_response(ID, []).
 1862pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1863    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1864    ignore(call(Closure, debug(ID, Msg))),
 1865    pengine_pull_response(ID, []).
 1866pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1867    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1868    ignore(call(Closure, prompt(ID, Term))).
 1869pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1870    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1871    ignore(call(Closure, success(ID, Sol, More))).
 1872pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1873    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1874    ignore(call(Closure, failure(ID))).
 1875pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1876    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1877    (   call(Closure, error(ID, Error))
 1878    ->  Continue = true
 1879    ;   forall(child(_,Child), pengine_destroy(Child)),
 1880        throw(Error)
 1881    ).
 1882pengine_process_event(stop(ID), Closure, true, _Options) :-
 1883    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1884    ignore(call(Closure, stop(ID))).
 1885pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1886    pengine_process_event(Event, Closure, _, Options),
 1887    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1888pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1889    retractall(child(_,ID)),
 1890    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1891    ignore(call(Closure, destroy(ID))).
 pengine_rpc(+URL, +Query) is nondet
 pengine_rpc(+URL, +Query, +Options) is nondet
Semantically equivalent to the sequence below, except that the query is executed in (and in the Prolog context of) the pengine server referred to by URL, rather than locally.
  copy_term_nat(Query, Copy),  % attributes are not copied to the server
  call(Copy),			 % executed on server at URL
  Query = Copy.

Valid options are:

chunk(+Integer)
Can be used to reduce the number of network roundtrips being made. See pengine_ask/3.
timeout(+Time)
Wait at most Time seconds for the next event from the server. The default is defined by the setting pengines:time_limit.

Remaining options (except the server option) are passed to pengine_create/1. */

 1920pengine_rpc(URL, Query) :-
 1921    pengine_rpc(URL, Query, []).
 1922
 1923pengine_rpc(URL, Query, M:Options0) :-
 1924    translate_local_sources(Options0, Options1, M),
 1925    (  option(timeout(_), Options1)
 1926    -> Options = Options1
 1927    ;  setting(time_limit, Limit),
 1928       Options = [timeout(Limit)|Options1]
 1929    ),
 1930    term_variables(Query, Vars),
 1931    Template =.. [v|Vars],
 1932    State = destroy(true),              % modified by process_event/4
 1933    setup_call_catcher_cleanup(
 1934        pengine_create([ ask(Query),
 1935                         template(Template),
 1936                         server(URL),
 1937                         id(Id)
 1938                       | Options
 1939                       ]),
 1940        wait_event(Template, State, [listen(Id)|Options]),
 1941        Why,
 1942        pengine_destroy_and_wait(State, Id, Why)).
 1943
 1944pengine_destroy_and_wait(destroy(true), Id, Why) :-
 1945    !,
 1946    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 1947    pengine_destroy(Id),
 1948    wait_destroy(Id, 10).
 1949pengine_destroy_and_wait(_, _, Why) :-
 1950    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 1951
 1952wait_destroy(Id, _) :-
 1953    \+ child(_, Id),
 1954    !.
 1955wait_destroy(Id, N) :-
 1956    pengine_event(Event, [listen(Id),timeout(10)]),
 1957    !,
 1958    (   destroy_event(Event)
 1959    ->  retractall(child(_,Id))
 1960    ;   succ(N1, N)
 1961    ->  wait_destroy(Id, N1)
 1962    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 1963        pengine_unregister_remote(Id),
 1964        retractall(child(_,Id))
 1965    ).
 1966
 1967wait_event(Template, State, Options) :-
 1968    pengine_event(Event, Options),
 1969    debug(pengine(event), 'Received ~p', [Event]),
 1970    process_event(Event, Template, State, Options).
 1971
 1972process_event(create(_ID, Features), Template, State, Options) :-
 1973    memberchk(answer(First), Features),
 1974    process_event(First, Template, State, Options).
 1975process_event(error(_ID, Error), _Template, _, _Options) :-
 1976    throw(Error).
 1977process_event(failure(_ID, _Time), _Template, _, _Options) :-
 1978    fail.
 1979process_event(prompt(ID, Prompt), Template, State, Options) :-
 1980    pengine_rpc_prompt(ID, Prompt, Reply),
 1981    pengine_send(ID, input(Reply)),
 1982    wait_event(Template, State, Options).
 1983process_event(output(ID, Term), Template, State, Options) :-
 1984    pengine_rpc_output(ID, Term),
 1985    pengine_pull_response(ID, Options),
 1986    wait_event(Template, State, Options).
 1987process_event(debug(ID, Message), Template, State, Options) :-
 1988    debug(pengine(debug), '~w', [Message]),
 1989    pengine_pull_response(ID, Options),
 1990    wait_event(Template, State, Options).
 1991process_event(success(_ID, Solutions, _Proj, _Time, false),
 1992              Template, _, _Options) :-
 1993    !,
 1994    member(Template, Solutions).
 1995process_event(success(ID, Solutions, _Proj, _Time, true),
 1996              Template, State, Options) :-
 1997    (   member(Template, Solutions)
 1998    ;   pengine_next(ID, Options),
 1999        wait_event(Template, State, Options)
 2000    ).
 2001process_event(destroy(ID, Event), Template, State, Options) :-
 2002    !,
 2003    retractall(child(_,ID)),
 2004    nb_setarg(1, State, false),
 2005    debug(pengine(destroy), 'State: ~p~n', [State]),
 2006    process_event(Event, Template, State, Options).
 2007% compatibility with older versions of the protocol.
 2008process_event(success(ID, Solutions, Time, More),
 2009              Template, State, Options) :-
 2010    process_event(success(ID, Solutions, _Proj, Time, More),
 2011                  Template, State, Options).
 2012
 2013
 2014pengine_rpc_prompt(ID, Prompt, Term) :-
 2015    prompt(ID, Prompt, Term0),
 2016    !,
 2017    Term = Term0.
 2018pengine_rpc_prompt(_ID, Prompt, Term) :-
 2019    setup_call_cleanup(
 2020        prompt(Old, Prompt),
 2021        read(Term),
 2022        prompt(_, Old)).
 2023
 2024pengine_rpc_output(ID, Term) :-
 2025    output(ID, Term),
 2026    !.
 2027pengine_rpc_output(_ID, Term) :-
 2028    print(Term).
 prompt(+ID, +Prompt, -Term) is semidet
Hook to handle pengine_input/2 from the remote pengine. If the hooks fails, pengine_rpc/3 calls read/1 using the current prompt.
 2035:- multifile prompt/3.
 output(+ID, +Term) is semidet
Hook to handle pengine_output/1 from the remote pengine. If the hook fails, it calls print/1 on Term.
 2042:- multifile output/2. 2043
 2044
 2045/*================= HTTP handlers =======================
 2046*/
 2047
 2048%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2049%   time_limit(inifinite) because pengines have their  own timeout. Also
 2050%   note that we use spawn. This  is   needed  because we can easily get
 2051%   many clients waiting for  some  action   on  a  pengine to complete.
 2052%   Without spawning, we would quickly exhaust   the  worker pool of the
 2053%   HTTP server.
 2054%
 2055%   FIXME: probably we should wait for a   short time for the pengine on
 2056%   the default worker thread. Only if  that   time  has expired, we can
 2057%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2058%   improve the performance and reduce the usage of threads.
 2059
 2060:- http_handler(root(pengine),               http_404([]),
 2061                [ id(pengines) ]). 2062:- http_handler(root(pengine/create),        http_pengine_create,
 2063                [ time_limit(infinite), spawn([]) ]). 2064:- http_handler(root(pengine/send),          http_pengine_send,
 2065                [ time_limit(infinite), spawn([]) ]). 2066:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 2067                [ time_limit(infinite), spawn([]) ]). 2068:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 2069:- http_handler(root(pengine/detach),        http_pengine_detach,        []). 2070:- http_handler(root(pengine/list),          http_pengine_list,          []). 2071:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 2072:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 2073
 2074:- http_handler(root(pengine/'pengines.js'),
 2075                http_reply_file(library('http/web/js/pengines.js'), []), []). 2076:- http_handler(root(pengine/'plterm.css'),
 2077                http_reply_file(library('http/web/css/plterm.css'), []), []).
 http_pengine_create(+Request)
HTTP POST handler for =/pengine/create=. This API accepts the pengine creation parameters both as application/json and as www-form-encoded. Accepted parameters:
ParameterDefaultComment
formatprologOutput format
applicationpengine_sandboxPengine application
chunk1Chunk-size for results
solutionschunkedIf all, emit all results
ask-The query
template-Output template
src_text""Program
src_url-Program to download
disposition-Download location

Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.

 2104http_pengine_create(Request) :-
 2105    reply_options(Request, [post]),
 2106    !.
 2107http_pengine_create(Request) :-
 2108    memberchk(content_type(CT), Request),
 2109    sub_atom(CT, 0, _, _, 'application/json'),
 2110    !,
 2111    http_read_json_dict(Request, Dict),
 2112    dict_atom_option(format, Dict, Format, prolog),
 2113    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2114    http_pengine_create(Request, Application, Format, Dict).
 2115http_pengine_create(Request) :-
 2116    Optional = [optional(true)],
 2117    OptString = [string|Optional],
 2118    Form = [ format(Format, [default(prolog)]),
 2119             application(Application, [default(pengine_sandbox)]),
 2120             chunk(_, [integer, default(1)]),
 2121             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2122             ask(_, OptString),
 2123             template(_, OptString),
 2124             src_text(_, OptString),
 2125             disposition(_, OptString),
 2126             src_url(_, Optional)
 2127           ],
 2128    http_parameters(Request, Form),
 2129    form_dict(Form, Dict),
 2130    http_pengine_create(Request, Application, Format, Dict).
 2131
 2132dict_atom_option(Key, Dict, Atom, Default) :-
 2133    (   get_dict(Key, Dict, String)
 2134    ->  atom_string(Atom, String)
 2135    ;   Atom = Default
 2136    ).
 2137
 2138form_dict(Form, Dict) :-
 2139    form_values(Form, Pairs),
 2140    dict_pairs(Dict, _, Pairs).
 2141
 2142form_values([], []).
 2143form_values([H|T], Pairs) :-
 2144    arg(1, H, Value),
 2145    nonvar(Value),
 2146    !,
 2147    functor(H, Name, _),
 2148    Pairs = [Name-Value|PairsT],
 2149    form_values(T, PairsT).
 2150form_values([_|T], Pairs) :-
 2151    form_values(T, Pairs).
 http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2156http_pengine_create(Request, Application, Format, Dict) :-
 2157    current_application(Application),
 2158    !,
 2159    allowed(Request, Application),
 2160    authenticate(Request, Application, UserOptions),
 2161    dict_to_options(Dict, Application, CreateOptions0),
 2162    append(UserOptions, CreateOptions0, CreateOptions),
 2163    pengine_uuid(Pengine),
 2164    message_queue_create(Queue, [max_size(25)]),
 2165    setting(Application:time_limit, TimeLimit),
 2166    get_time(Now),
 2167    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2168    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2169    create(Queue, Pengine, CreateOptions, http, Application),
 2170    create_wait_and_output_result(Pengine, Queue, Format,
 2171                                  TimeLimit, Dict),
 2172    gc_abandoned_queues.
 2173http_pengine_create(_Request, Application, Format, _Dict) :-
 2174    Error = existence_error(pengine_application, Application),
 2175    pengine_uuid(ID),
 2176    output_result(Format, error(ID, error(Error, _))).
 2177
 2178
 2179dict_to_options(Dict, Application, CreateOptions) :-
 2180    dict_pairs(Dict, _, Pairs),
 2181    pairs_create_options(Pairs, Application, CreateOptions).
 2182
 2183pairs_create_options([], _, []) :- !.
 2184pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2185    Opt =.. [N,V],
 2186    pengine_create_option(Opt), N \== user,
 2187    !,
 2188    (   create_option_type(Opt, atom)
 2189    ->  atom_string(V, V0)               % term creation must be done if
 2190    ;   V = V0                           % we created the source and know
 2191    ),                                   % the operators.
 2192    pairs_create_options(T0, App, T).
 2193pairs_create_options([_|T0], App, T) :-
 2194    pairs_create_options(T0, App, T).
 wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit) is det
Wait for the Pengine's Queue and if there is a message, send it to the requester using output_result/1. If Pengine does not answer within the time specified by the setting time_limit, Pengine is aborted and the result is error(time_limit_exceeded, _).
 2205wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
 2206    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2207                                 [ timeout(TimeLimit)
 2208                                 ]),
 2209              Error, true)
 2210    ->  (   var(Error)
 2211        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2212            ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2213            output_result(Format, Event)
 2214        ;   output_result(Format, died(Pengine))
 2215        )
 2216    ;   time_limit_exceeded(Pengine, Format)
 2217    ).
 create_wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit, +Dict) is det
Intercepts the `solutions=all' case used for downloading results. Dict may contain a disposition key to denote the download location.
 2226create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2227    get_dict(solutions, Dict, all),
 2228    !,
 2229    between(1, infinite, Page),
 2230    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2231                                 [ timeout(TimeLimit)
 2232                                 ]),
 2233              Error, true)
 2234    ->  (   var(Error)
 2235        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2236            (   destroy_queue_from_http(Pengine, Event, Queue)
 2237            ->  !, output_result(Format, page(Page, Event), Dict)
 2238            ;   is_more_event(Event)
 2239            ->  pengine_thread(Pengine, Thread),
 2240                thread_send_message(Thread, pengine_request(next)),
 2241                output_result(Format, page(Page, Event), Dict),
 2242                fail
 2243            ;   !, output_result(Format, page(Page, Event), Dict)
 2244            )
 2245        ;   !, output_result(Format, died(Pengine))
 2246        )
 2247    ;   !, time_limit_exceeded(Pengine, Format)
 2248    ),
 2249    !.
 2250create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :-
 2251    wait_and_output_result(Pengine, Queue, Format, TimeLimit).
 2252
 2253is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2254is_more_event(create(_, Options)) :-
 2255    memberchk(answer(Event), Options),
 2256    is_more_event(Event).
 time_limit_exceeded(+Pengine, +Format)
The Pengine did not reply within its time limit. Send a reply to the client in the requested format and interrupt the Pengine.
bug
- Ideally, if the Pengine has destroy set to false, we should get the Pengine back to its main loop. Unfortunately we only have normal exceptions that may be caught by the Pengine and abort which cannot be caught and thus destroys the Pengine.
 2270time_limit_exceeded(Pengine, Format) :-
 2271    call_cleanup(
 2272        pengine_destroy(Pengine, [force(true)]),
 2273        output_result(Format,
 2274                      destroy(Pengine,
 2275                              error(Pengine, time_limit_exceeded)))).
 destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet
Consider destroying the output queue for Pengine after sending Event back to the HTTP client. We can destroy the queue if
To be done
- If the client did not request all output, the queue will not be destroyed. We need some timeout and GC for that.
 2290destroy_queue_from_http(ID, _, Queue) :-
 2291    output_queue(ID, Queue, _),
 2292    !,
 2293    destroy_queue_if_empty(Queue).
 2294destroy_queue_from_http(ID, Event, Queue) :-
 2295    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2296    is_destroy_event(Event),
 2297    !,
 2298    message_queue_property(Queue, size(Waiting)),
 2299    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2300    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2301
 2302is_destroy_event(destroy(_)).
 2303is_destroy_event(destroy(_,_)).
 2304is_destroy_event(create(_, Options)) :-
 2305    memberchk(answer(Event), Options),
 2306    is_destroy_event(Event).
 2307
 2308destroy_queue_if_empty(Queue) :-
 2309    thread_peek_message(Queue, _),
 2310    !.
 2311destroy_queue_if_empty(Queue) :-
 2312    retractall(output_queue(_, Queue, _)),
 2313    message_queue_destroy(Queue).
 gc_abandoned_queues
Check whether there are queues that have been abadoned. This happens if the stream contains output events and not all of them are read by the client.
 2321:- dynamic
 2322    last_gc/1. 2323
 2324gc_abandoned_queues :-
 2325    consider_queue_gc,
 2326    !,
 2327    get_time(Now),
 2328    (   output_queue(_, Queue, Time),
 2329        Now-Time > 15*60,
 2330        retract(output_queue(_, Queue, Time)),
 2331        message_queue_destroy(Queue),
 2332        fail
 2333    ;   retractall(last_gc(_)),
 2334        asserta(last_gc(Now))
 2335    ).
 2336gc_abandoned_queues.
 2337
 2338consider_queue_gc :-
 2339    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2340    N > 100,
 2341    (   last_gc(Time),
 2342        get_time(Now),
 2343        Now-Time > 5*60
 2344    ->  true
 2345    ;   \+ last_gc(_)
 2346    ).
 sync_destroy_queue_from_http(+Pengine, +Queue) is det
 sync_delay_destroy_queue(+Pengine, +Queue) is det
Handle destruction of the message queue connecting the HTTP side to the pengine. We cannot delete the queue when the pengine dies because the queue may contain output events. Termination of the pengine and finishing the HTTP exchange may happen in both orders. This means we need handle this using synchronization.
sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called (indirectly) from pengine_done/1 if the pengine's thread dies.
sync_destroy_queue_from_http(+Pengine, +Queue)
Called from destroy_queue/3, from wait_and_output_result/4, i.e., from the HTTP side.
 2364:- dynamic output_queue_destroyed/1. 2365
 2366sync_destroy_queue_from_http(ID, Queue) :-
 2367    (   output_queue(ID, Queue, _)
 2368    ->  destroy_queue_if_empty(Queue)
 2369    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2370    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2371              [Queue]),
 2372        get_time(Now),
 2373        asserta(output_queue(ID, Queue, Now))
 2374    ;   message_queue_destroy(Queue),
 2375        asserta(output_queue_destroyed(Queue))
 2376    ).
 sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called from pengine_unregister/1 when the pengine thread terminates. It is called while the mutex pengine held.
 2383sync_destroy_queue_from_pengine(ID, Queue) :-
 2384    (   retract(output_queue_destroyed(Queue))
 2385    ->  true
 2386    ;   get_time(Now),
 2387        asserta(output_queue(ID, Queue, Now))
 2388    ),
 2389    retractall(pengine_queue(ID, Queue, _, _)).
 2390
 2391
 2392http_pengine_send(Request) :-
 2393    reply_options(Request, [get,post]),
 2394    !.
 2395http_pengine_send(Request) :-
 2396    http_parameters(Request,
 2397                    [ id(ID, [ type(atom) ]),
 2398                      event(EventString, [optional(true)]),
 2399                      format(Format, [default(prolog)])
 2400                    ]),
 2401    catch(read_event(ID, Request, Format, EventString, Event),
 2402          Error,
 2403          true),
 2404    (   var(Error)
 2405    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2406        (   pengine_thread(ID, Thread)
 2407        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2408            random_delay,
 2409            broadcast(pengine(send(ID, Event))),
 2410            thread_send_message(Thread, pengine_request(Event)),
 2411            wait_and_output_result(ID, Queue, Format, TimeLimit)
 2412        ;   atom(ID)
 2413        ->  pengine_died(Format, ID)
 2414        ;   http_404([], Request)
 2415        )
 2416    ;   Error = error(existence_error(pengine, ID), _)
 2417    ->  pengine_died(Format, ID)
 2418    ;   output_result(Format, error(ID, Error))
 2419    ).
 2420
 2421pengine_died(Format, Pengine) :-
 2422    output_result(Format, error(Pengine,
 2423                                error(existence_error(pengine, Pengine),_))).
 read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
Read an event on behalve of Pengine. Note that the pengine's module should not be deleted while we are reading using its syntax (module). This is ensured using the pengine_done mutex.
See also
- pengine_done/0.
 2434read_event(Pengine, Request, Format, EventString, Event) :-
 2435    with_mutex(
 2436        pengine_done,
 2437        ( pengine_thread(Pengine, _Thread),           % check existence
 2438          get_pengine_module(Pengine, Module),
 2439          read_event_2(Request, EventString, Module, Event0, Bindings)
 2440        )),
 2441    !,
 2442    fix_bindings(Format, Event0, Bindings, Event).
 2443read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2444    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2445    discard_post_data(Request),
 2446    existence_error(pengine, Pengine).
 read_event_(+Request, +EventString, +Module, -Event, -Bindings)
Read the sent event. The event is a Prolog term that is either in the event parameter or as a posted document.
 2454read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2455    nonvar(EventString),
 2456    !,
 2457    term_string(Event, EventString,
 2458                [ variable_names(Bindings),
 2459                  module(Module)
 2460                ]).
 2461read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2462    option(method(post), Request),
 2463    http_read_data(Request,     Event,
 2464                   [ content_type('application/x-prolog'),
 2465                     module(Module),
 2466                     variable_names(Bindings)
 2467                   ]).
 discard_post_data(+Request) is det
If this is a POST request, discard the posted data.
 2473discard_post_data(Request) :-
 2474    option(method(post), Request),
 2475    !,
 2476    setup_call_cleanup(
 2477        open_null_stream(NULL),
 2478        http_read_data(Request, _, [to(stream(NULL))]),
 2479        close(NULL)).
 2480discard_post_data(_).
 fix_bindings(+Format, +EventIn, +Bindings, -Event) is det
Generate the template for json(-s) Format from the variables in the asked Goal. Variables starting with an underscore, followed by an capital letter are ignored from the template.
 2488fix_bindings(Format,
 2489             ask(Goal, Options0), Bindings,
 2490             ask(Goal, NewOptions)) :-
 2491    json_lang(Format),
 2492    !,
 2493    exclude(anon, Bindings, NamedBindings),
 2494    template(NamedBindings, Template, Options0, Options1),
 2495    select_option(chunk(Paging), Options1, Options2, 1),
 2496    NewOptions = [ template(Template),
 2497                   chunk(Paging),
 2498                   bindings(NamedBindings)
 2499                 | Options2
 2500                 ].
 2501fix_bindings(_, Command, _, Command).
 2502
 2503template(_, Template, Options0, Options) :-
 2504    select_option(template(Template), Options0, Options),
 2505    !.
 2506template(Bindings, Template, Options, Options) :-
 2507    dict_create(Template, swish_default_template, Bindings).
 2508
 2509anon(Name=_) :-
 2510    sub_atom(Name, 0, _, _, '_'),
 2511    sub_atom(Name, 1, 1, _, Next),
 2512    char_type(Next, prolog_var_start).
 2513
 2514var_name(Name=_, Name).
 json_lang(+Format) is semidet
True if Format is a JSON variation.
 2521json_lang(json) :- !.
 2522json_lang(Format) :-
 2523    sub_atom(Format, 0, _, _, 'json-').
 http_pengine_pull_response(+Request)
HTTP handler for /pengine/pull_response. Pulls possible pending messages from the pengine.
 2530http_pengine_pull_response(Request) :-
 2531    reply_options(Request, [get]),
 2532    !.
 2533http_pengine_pull_response(Request) :-
 2534    http_parameters(Request,
 2535            [   id(ID, []),
 2536                format(Format, [default(prolog)])
 2537            ]),
 2538    reattach(ID),
 2539    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2540        ->  true
 2541        ;   output_queue(ID, Queue, _),
 2542            TimeLimit = 0
 2543        )
 2544    ->  wait_and_output_result(ID, Queue, Format, TimeLimit)
 2545    ;   http_404([], Request)
 2546    ).
 http_pengine_abort(+Request)
HTTP handler for /pengine/abort. Note that abort may be sent at any time and the reply may be handled by a pull_response. In that case, our pengine has already died before we get to wait_and_output_result/4.
 2555http_pengine_abort(Request) :-
 2556    reply_options(Request, [get,post]),
 2557    !.
 2558http_pengine_abort(Request) :-
 2559    http_parameters(Request,
 2560            [   id(ID, [])
 2561            ]),
 2562    (   pengine_thread(ID, _Thread)
 2563    ->  broadcast(pengine(abort(ID))),
 2564        abort_pending_output(ID),
 2565        pengine_abort(ID),
 2566        reply_json(true)
 2567    ;   http_404([], Request)
 2568    ).
 http_pengine_detach(+Request)
Detach a Pengine while keeping it running. This has the following consequences:
 2580http_pengine_detach(Request) :-
 2581    reply_options(Request, [post]),
 2582    !.
 2583http_pengine_detach(Request) :-
 2584    http_parameters(Request,
 2585                    [ id(ID, [])
 2586                    ]),
 2587    http_read_json_dict(Request, ClientData),
 2588    (   pengine_property(ID, application(Application)),
 2589        allowed(Request, Application),
 2590        authenticate(Request, Application, _UserOptions)
 2591    ->  broadcast(pengine(detach(ID))),
 2592        get_time(Now),
 2593        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2594        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2595        message_queue_set(Queue, max_size(1000)),
 2596        pengine_reply(Queue, detached(ID)),
 2597        reply_json(true)
 2598    ;   http_404([], Request)
 2599    ).
 2600
 2601:- if(\+current_predicate(message_queue_set/2)). 2602message_queue_set(_,_).
 2603:- endif. 2604
 2605reattach(ID) :-
 2606    (   retract(pengine_detached(ID, _Data)),
 2607        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2608    ->  message_queue_set(Queue, max_size(25))
 2609    ;   true
 2610    ).
 http_pengine_destroy_all(+Request)
Destroy a list of pengines. Normally called by pengines.js if the browser window is closed.
 2618http_pengine_destroy_all(Request) :-
 2619    reply_options(Request, [get,post]),
 2620    !.
 2621http_pengine_destroy_all(Request) :-
 2622    http_parameters(Request,
 2623                    [ ids(IDsAtom, [])
 2624                    ]),
 2625    atomic_list_concat(IDs, ',', IDsAtom),
 2626    forall(( member(ID, IDs),
 2627             \+ pengine_detached(ID, _)
 2628           ),
 2629           pengine_destroy(ID, [force(true)])),
 2630    reply_json("ok").
 http_pengine_ping(+Request)
HTTP handler for /pengine/ping. If the requested Pengine is alive and event status(Pengine, Stats) is created, where Stats is the return of thread_statistics/2.
 2638http_pengine_ping(Request) :-
 2639    reply_options(Request, [get]),
 2640    !.
 2641http_pengine_ping(Request) :-
 2642    http_parameters(Request,
 2643                    [ id(Pengine, []),
 2644                      format(Format, [default(prolog)])
 2645                    ]),
 2646    (   pengine_thread(Pengine, Thread),
 2647        catch(thread_statistics(Thread, Stats), _, fail)
 2648    ->  output_result(Format, ping(Pengine, Stats))
 2649    ;   output_result(Format, died(Pengine))
 2650    ).
 http_pengine_list(+Request)
HTTP handler for `/pengine/list`, providing information about running Pengines.
To be done
- Only list detached Pengines associated to the logged in user.
 2659http_pengine_list(Request) :-
 2660    reply_options(Request, [get]),
 2661    !.
 2662http_pengine_list(Request) :-
 2663    http_parameters(Request,
 2664                    [ status(Status, [default(detached), oneof([detached])]),
 2665                      application(Application, [default(pengine_sandbox)])
 2666                    ]),
 2667    allowed(Request, Application),
 2668    authenticate(Request, Application, _UserOptions),
 2669    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2670    reply_json(json{pengines: Terms}).
 2671
 2672listed_pengine(Application, detached, State) :-
 2673    State = pengine{id:Id,
 2674                    detached:Time,
 2675                    queued:Queued,
 2676                    stats:Stats},
 2677
 2678    pengine_property(Id, application(Application)),
 2679    pengine_property(Id, detached(Time)),
 2680    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2681    message_queue_property(Queue, size(Queued)),
 2682    (   pengine_thread(Id, Thread),
 2683        catch(thread_statistics(Thread, Stats), _, fail)
 2684    ->  true
 2685    ;   Stats = thread{status:died}
 2686    ).
 output_result(+Format, +EventTerm) is det
 output_result(+Format, +EventTerm, +OptionsDict) is det
Formulate an HTTP response from a pengine event term. Format is one of prolog, json or json-s.
 2695:- dynamic
 2696    pengine_replying/2.             % +Pengine, +Thread
 2697
 2698output_result(Format, Event) :-
 2699    arg(1, Event, Pengine),
 2700    thread_self(Thread),
 2701    cors_enable,            % contingent on http:cors setting
 2702    disable_client_cache,
 2703    setup_call_cleanup(
 2704        asserta(pengine_replying(Pengine, Thread), Ref),
 2705        catch(output_result(Format, Event, _{}),
 2706              pengine_abort_output,
 2707              true),
 2708        erase(Ref)).
 2709
 2710output_result(Lang, Event, Dict) :-
 2711    write_result(Lang, Event, Dict),
 2712    !.
 2713output_result(prolog, Event, _) :-
 2714    !,
 2715    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2716    write_term(Event,
 2717               [ quoted(true),
 2718                 ignore_ops(true),
 2719                 fullstop(true),
 2720                 blobs(portray),
 2721                 portray_goal(portray_blob),
 2722                 nl(true)
 2723               ]).
 2724output_result(Lang, Event, _) :-
 2725    json_lang(Lang),
 2726    !,
 2727    (   event_term_to_json_data(Event, JSON, Lang)
 2728    ->  reply_json(JSON)
 2729    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2730    ).
 2731output_result(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2732    domain_error(pengine_format, Lang).
 portray_blob(+Blob, +Options) is det
Portray non-text blobs that may appear in output terms. Not really sure about that. Basically such terms need to be avoided as they are meaningless outside the process. The generated error is hard to debug though, so now we send them as '$BLOB'(Type). Future versions may include more info, depending on Type.
 2742:- public portray_blob/2.               % called from write-term
 2743portray_blob(Blob, _Options) :-
 2744    blob(Blob, Type),
 2745    writeq('$BLOB'(Type)).
 abort_pending_output(+Pengine) is det
If we get an abort, it is possible that output is being produced for the client. This predicate aborts these threads.
 2752abort_pending_output(Pengine) :-
 2753    forall(pengine_replying(Pengine, Thread),
 2754           abort_output_thread(Thread)).
 2755
 2756abort_output_thread(Thread) :-
 2757    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2758          error(existence_error(thread, _), _),
 2759          true).
 write_result(+Lang, +Event, +Dict) is semidet
Hook that allows for different output formats. The core Pengines library supports prolog and various JSON dialects. The hook event_to_json/3 can be used to refine the JSON dialects. This hook must be used if a completely different output format is desired.
 disable_client_cache
Make sure the client will not cache our page.
See also
- http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2775disable_client_cache :-
 2776    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2777            Pragma: no-cache\r\n\c
 2778            Expires: 0\r\n').
 2779
 2780event_term_to_json_data(Event, JSON, Lang) :-
 2781    event_to_json(Event, JSON, Lang),
 2782    !.
 2783event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2784                        json{event:success, id:ID, time:Time,
 2785                             data:Bindings, more:More, projection:Projection},
 2786                        json) :-
 2787    !,
 2788    term_to_json(Bindings0, Bindings).
 2789event_term_to_json_data(destroy(ID, Event),
 2790                        json{event:destroy, id:ID, data:JSON},
 2791                        Style) :-
 2792    !,
 2793    event_term_to_json_data(Event, JSON, Style).
 2794event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2795    !,
 2796    (   select(answer(First0), Features0, Features1)
 2797    ->  event_term_to_json_data(First0, First, Style),
 2798        Features = [answer(First)|Features1]
 2799    ;   Features = Features0
 2800    ),
 2801    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2802event_term_to_json_data(destroy(ID, Event),
 2803                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2804    !,
 2805    event_term_to_json_data(Event, JSON, Style).
 2806event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2807    !,
 2808    Error0 = json{event:error, id:ID, data:Message},
 2809    add_error_details(ErrorTerm, Error0, Error),
 2810    message_to_string(ErrorTerm, Message).
 2811event_term_to_json_data(failure(ID, Time),
 2812                        json{event:failure, id:ID, time:Time}, _) :-
 2813    !.
 2814event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2815    functor(EventTerm, F, 1),
 2816    !,
 2817    arg(1, EventTerm, ID).
 2818event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2819    functor(EventTerm, F, 2),
 2820    arg(1, EventTerm, ID),
 2821    arg(2, EventTerm, Data),
 2822    term_to_json(Data, JSON).
 2823
 2824:- public add_error_details/3.
 add_error_details(+Error, +JSON0, -JSON)
Add format error code and location information to an error. Also used by pengines_io.pl.
 2831add_error_details(Error, JSON0, JSON) :-
 2832    add_error_code(Error, JSON0, JSON1),
 2833    add_error_location(Error, JSON1, JSON).
 add_error_code(+Error, +JSON0, -JSON) is det
Add a code field to JSON0 of Error is an ISO error term. The error code is the functor name of the formal part of the error, e.g., syntax_error, type_error, etc. Some errors carry more information:
existence_error(Type, Obj)
{arg1:Type, arg2:Obj}, where Obj is stringified of it is not atomic.
 2846add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2847    atom(Type),
 2848    !,
 2849    to_atomic(Obj, Value),
 2850    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 2851add_error_code(error(Formal, _), Error0, Error) :-
 2852    callable(Formal),
 2853    !,
 2854    functor(Formal, Code, _),
 2855    Error = Error0.put(code, Code).
 2856add_error_code(_, Error, Error).
 2857
 2858% What to do with large integers?
 2859to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 2860to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 2861to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 2862to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 add_error_location(+Error, +JSON0, -JSON) is det
Add a location property if the error can be associated with a source location. The location is an object with properties file and line and, if available, the character location in the line.
 2871add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 2872    atom(Path), integer(Line),
 2873    !,
 2874    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 2875add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 2876    atom(Path), integer(Line), integer(Ch),
 2877    !,
 2878    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 2879add_error_location(_, Term, Term).
 event_to_json(+Event, -JSONTerm, +Lang) is semidet
Hook that translates a Pengine event structure into a term suitable for reply_json/1, according to the language specification Lang. This can be used to massage general Prolog terms, notably associated with success(ID, Bindings, Projection, Time, More) and output(ID, Term) into a format suitable for processing at the client side.
 2890%:- multifile pengines:event_to_json/3.
 2891
 2892
 2893                 /*******************************
 2894                 *        ACCESS CONTROL        *
 2895                 *******************************/
 allowed(+Request, +Application) is det
Check whether the peer is allowed to connect. Returns a forbidden header if contact is not allowed.
 2902allowed(Request, Application) :-
 2903    setting(Application:allow_from, Allow),
 2904    match_peer(Request, Allow),
 2905    setting(Application:deny_from, Deny),
 2906    \+ match_peer(Request, Deny),
 2907    !.
 2908allowed(Request, _Application) :-
 2909    memberchk(request_uri(Here), Request),
 2910    throw(http_reply(forbidden(Here))).
 2911
 2912match_peer(_, Allowed) :-
 2913    memberchk(*, Allowed),
 2914    !.
 2915match_peer(_, []) :- !, fail.
 2916match_peer(Request, Allowed) :-
 2917    http_peer(Request, Peer),
 2918    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 2919    (   memberchk(Peer, Allowed)
 2920    ->  true
 2921    ;   member(Pattern, Allowed),
 2922        match_peer_pattern(Pattern, Peer)
 2923    ).
 2924
 2925match_peer_pattern(Pattern, Peer) :-
 2926    ip_term(Pattern, IP),
 2927    ip_term(Peer, IP),
 2928    !.
 2929
 2930ip_term(Peer, Pattern) :-
 2931    split_string(Peer, ".", "", PartStrings),
 2932    ip_pattern(PartStrings, Pattern).
 2933
 2934ip_pattern([], []).
 2935ip_pattern([*], _) :- !.
 2936ip_pattern([S|T0], [N|T]) :-
 2937    number_string(N, S),
 2938    ip_pattern(T0, T).
 authenticate(+Request, +Application, -UserOptions:list) is det
Call authentication_hook/3, returning either [user(User)], [] or an exception.
 2946authenticate(Request, Application, UserOptions) :-
 2947    authentication_hook(Request, Application, User),
 2948    !,
 2949    must_be(ground, User),
 2950    UserOptions = [user(User)].
 2951authenticate(_, _, []).
 authentication_hook(+Request, +Application, -User) is semidet
This hook is called from the =/pengine/create= HTTP handler to discover whether the server is accessed by an authorized user. It can react in three ways:
See also
- http_authenticate/3 can be used to implement this hook using default HTTP authentication data.
 2973pengine_register_user(Options) :-
 2974    option(user(User), Options),
 2975    !,
 2976    pengine_self(Me),
 2977    asserta(pengine_user(Me, User)).
 2978pengine_register_user(_).
 pengine_user(-User) is semidet
True when the pengine was create by an HTTP request that authorized User.
See also
- authentication_hook/3 can be used to extract authorization from the HTTP header.
 2989pengine_user(User) :-
 2990    pengine_self(Me),
 2991    pengine_user(Me, User).
 reply_options(+Request, +Methods) is semidet
Reply the HTTP OPTIONS request
 2997reply_options(Request, Allowed) :-
 2998    option(method(options), Request),
 2999    !,
 3000    cors_enable(Request,
 3001                [ methods(Allowed)
 3002                ]),
 3003    format('Content-type: text/plain\r\n'),
 3004    format('~n').                   % empty body
 3005
 3006
 3007                 /*******************************
 3008                 *        COMPILE SOURCE        *
 3009                 *******************************/
 pengine_src_text(+SrcText, +Module) is det
Asserts the clauses defined in SrcText in the private database of the current Pengine. This predicate processes the `src_text' option of pengine_create/1. */
 3018pengine_src_text(Src, Module) :-
 3019    pengine_self(Self),
 3020    format(atom(ID), 'pengine://~w/src', [Self]),
 3021    extra_load_options(Self, Options),
 3022    setup_call_cleanup(
 3023        open_chars_stream(Src, Stream),
 3024        load_files(Module:ID,
 3025                   [ stream(Stream),
 3026                     module(Module),
 3027                     silent(true)
 3028                   | Options
 3029                   ]),
 3030        close(Stream)),
 3031    keep_source(Self, ID, Src).
 3032
 3033system:'#file'(File, _Line) :-
 3034    prolog_load_context(stream, Stream),
 3035    set_stream(Stream, file_name(File)),
 3036    set_stream(Stream, record_position(false)),
 3037    set_stream(Stream, record_position(true)).
 pengine_src_url(+URL, +Module) is det
Asserts the clauses defined in URL in the private database of the current Pengine. This predicate processes the `src_url' option of pengine_create/1.
To be done
- : make a sensible guess at the encoding.
 3047pengine_src_url(URL, Module) :-
 3048    pengine_self(Self),
 3049    uri_encoded(path, URL, Path),
 3050    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3051    extra_load_options(Self, Options),
 3052    (   get_pengine_application(Self, Application),
 3053        setting(Application:debug_info, false)
 3054    ->  setup_call_cleanup(
 3055            http_open(URL, Stream, []),
 3056            ( set_stream(Stream, encoding(utf8)),
 3057              load_files(Module:ID,
 3058                         [ stream(Stream),
 3059                           module(Module)
 3060                         | Options
 3061                         ])
 3062            ),
 3063            close(Stream))
 3064    ;   setup_call_cleanup(
 3065            http_open(URL, TempStream, []),
 3066            ( set_stream(TempStream, encoding(utf8)),
 3067              read_string(TempStream, _, Src)
 3068            ),
 3069            close(TempStream)),
 3070        setup_call_cleanup(
 3071            open_chars_stream(Src, Stream),
 3072            load_files(Module:ID,
 3073                       [ stream(Stream),
 3074                         module(Module)
 3075                       | Options
 3076                       ]),
 3077            close(Stream)),
 3078        keep_source(Self, ID, Src)
 3079    ).
 3080
 3081
 3082extra_load_options(Pengine, Options) :-
 3083    pengine_not_sandboxed(Pengine),
 3084    !,
 3085    Options = [].
 3086extra_load_options(_, [sandboxed(true)]).
 3087
 3088
 3089keep_source(Pengine, ID, SrcText) :-
 3090    get_pengine_application(Pengine, Application),
 3091    setting(Application:debug_info, true),
 3092    !,
 3093    to_string(SrcText, SrcString),
 3094    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3095keep_source(_, _, _).
 3096
 3097to_string(String, String) :-
 3098    string(String),
 3099    !.
 3100to_string(Atom, String) :-
 3101    atom_string(Atom, String),
 3102    !.
 3103
 3104		 /*******************************
 3105		 *            SANDBOX		*
 3106		 *******************************/
 3107
 3108:- multifile
 3109    sandbox:safe_primitive/1. 3110
 3111sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3112sandbox:safe_primitive(pengines:pengine_output(_)).
 3113sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3114
 3115
 3116                 /*******************************
 3117                 *            MESSAGES          *
 3118                 *******************************/
 3119
 3120prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3121    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3122      'This is normally caused by an insufficiently instantiated'-[], nl,
 3123      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3124      'find all possible instantations of Var.'-[]
 3125    ]