View source with raw comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2023, Torbjörn Lager,
    8                              VU University Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(pengines,
   39          [ pengine_create/1,                   % +Options
   40            pengine_ask/3,                      % +Pengine, :Query, +Options
   41            pengine_next/2,                     % +Pengine. +Options
   42            pengine_stop/2,                     % +Pengine. +Options
   43            pengine_event/2,                    % -Event, +Options
   44            pengine_input/2,                    % +Prompt, -Term
   45            pengine_output/1,                   % +Term
   46            pengine_respond/3,                  % +Pengine, +Input, +Options
   47            pengine_debug/2,                    % +Format, +Args
   48            pengine_self/1,                     % -Pengine
   49            pengine_pull_response/2,            % +Pengine, +Options
   50            pengine_destroy/1,                  % +Pengine
   51            pengine_destroy/2,                  % +Pengine, +Options
   52            pengine_abort/1,                    % +Pengine
   53            pengine_application/1,              % +Application
   54            current_pengine_application/1,      % ?Application
   55            pengine_property/2,                 % ?Pengine, ?Property
   56            pengine_user/1,                     % -User
   57            pengine_event_loop/2,               % :Closure, +Options
   58            pengine_rpc/2,                      % +Server, :Goal
   59            pengine_rpc/3                       % +Server, :Goal, +Options
   60          ]).

Pengines: Web Logic Programming Made Easy

The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.

author
- Torbjörn Lager and Jan Wielemaker */
   71:- autoload(library(aggregate),[aggregate_all/3]).   72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]).   73:- autoload(library(broadcast),[broadcast/1]).   74:- autoload(library(charsio),[open_chars_stream/2]).   75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]).   76:- autoload(library(error),
   77	    [ must_be/2,
   78	      existence_error/2,
   79	      permission_error/3,
   80	      domain_error/2
   81	    ]).   82:- autoload(library(filesex),[directory_file_path/3]).   83:- autoload(library(listing),[listing/1]).   84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]).   85:- autoload(library(modules),[in_temporary_module/3]).   86:- autoload(library(occurs),[sub_term/2]).   87:- autoload(library(option),
   88	    [select_option/3,option/2,option/3,select_option/4]).   89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]).   90:- autoload(library(sandbox),[safe_goal/1]).   91:- autoload(library(statistics),[thread_statistics/2]).   92:- autoload(library(term_to_json),[term_to_json/2]).   93:- autoload(library(thread_pool),
   94	    [thread_pool_create/3,thread_create_in_pool/4]).   95:- autoload(library(time),[alarm/4,call_with_time_limit/2]).   96:- autoload(library(uri),
   97	    [ uri_components/2,
   98	      uri_query_components/2,
   99	      uri_data/3,
  100	      uri_data/4,
  101	      uri_encoded/3
  102	    ]).  103:- autoload(library(http/http_client),[http_read_data/3]).  104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]).  105:- autoload(library(http/http_dispatch),
  106	    [http_handler/3,http_404/2,http_reply_file/3]).  107:- autoload(library(http/http_open),[http_open/3]).  108:- autoload(library(http/http_parameters),[http_parameters/2]).  109:- autoload(library(http/http_stream),[is_cgi_stream/1]).  110:- autoload(library(http/http_wrapper),[http_peer/2]).  111
  112:- use_module(library(settings),[setting/2,setting/4]).  113:- use_module(library(http/http_json),
  114              [http_read_json_dict/2,reply_json/1]).  115
  116:- if(exists_source(library(uuid))).  117:- autoload(library(uuid), [uuid/2]).  118:- endif.  119
  120
  121:- meta_predicate
  122    pengine_create(:),
  123    pengine_rpc(+, +, :),
  124    pengine_event_loop(1, +).  125
  126:- multifile
  127    write_result/3,                 % +Format, +Event, +Dict
  128    event_to_json/3,                % +Event, -JSON, +Format
  129    prepare_module/3,               % +Module, +Application, +Options
  130    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  131    authentication_hook/3,          % +Request, +Application, -User
  132    not_sandboxed/2.                % +User, +App
  133
  134:- predicate_options(pengine_create/1, 1,
  135                     [ id(-atom),
  136                       alias(atom),
  137                       application(atom),
  138                       destroy(boolean),
  139                       server(atom),
  140                       ask(compound),
  141                       template(compound),
  142                       chunk(integer;oneof([false])),
  143                       bindings(list),
  144                       src_list(list),
  145                       src_text(any),           % text
  146                       src_url(atom),
  147                       src_predicates(list)
  148                     ]).  149:- predicate_options(pengine_ask/3, 3,
  150                     [ template(any),
  151                       chunk(integer;oneof([false])),
  152                       bindings(list)
  153                     ]).  154:- predicate_options(pengine_next/2, 2,
  155                     [ chunk(integer),
  156                       pass_to(pengine_send/3, 3)
  157                     ]).  158:- predicate_options(pengine_stop/2, 2,
  159                     [ pass_to(pengine_send/3, 3)
  160                     ]).  161:- predicate_options(pengine_respond/3, 2,
  162                     [ pass_to(pengine_send/3, 3)
  163                     ]).  164:- predicate_options(pengine_rpc/3, 3,
  165                     [ chunk(integer;oneof([false])),
  166                       pass_to(pengine_create/1, 1)
  167                     ]).  168:- predicate_options(pengine_send/3, 3,
  169                     [ delay(number)
  170                     ]).  171:- predicate_options(pengine_event/2, 2,
  172                     [ listen(atom),
  173                       pass_to(system:thread_get_message/3, 3)
  174                     ]).  175:- predicate_options(pengine_pull_response/2, 2,
  176                     [ pass_to(http_open/3, 3)
  177                     ]).  178:- predicate_options(pengine_event_loop/2, 2,
  179                     []).                       % not yet implemented
  180
  181% :- debug(pengine(transition)).
  182:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  183
  184goal_expansion(random_delay, Expanded) :-
  185    (   debugging(pengine(delay))
  186    ->  Expanded = do_random_delay
  187    ;   Expanded = true
  188    ).
  189
  190do_random_delay :-
  191    Delay is random(20)/1000,
  192    sleep(Delay).
  193
  194:- meta_predicate                       % internal meta predicates
  195    solve(+, ?, 0, +),
  196    findnsols_no_empty(+, ?, 0, -),
  197    pengine_event_loop(+, 1, +).
 pengine_create(:Options) is det
Creates a new pengine. Valid options are:
id(-ID)
ID gets instantiated to the id of the created pengine. ID is atomic.
alias(+Name)
The pengine is named Name (an atom). A slave pengine (child) can subsequently be referred to by this name.
application(+Application)
Application in which the pengine runs. See pengine_application/1.
server(+URL)
The pengine will run in (and in the Prolog context of) the pengine server located at URL.
src_list(+List_of_clauses)
Inject a list of Prolog clauses into the pengine.
src_text(+Atom_or_string)
Inject the clauses specified by a source text into the pengine.
src_url(+URL)
Inject the clauses specified in the file located at URL into the pengine.
src_predicates(+List)
Send the local predicates denoted by List to the remote pengine. List is a list of predicate indicators.

Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..

Successful creation of a pengine will return an event term of the following form:

create(ID, Term)
ID is the id of the pengine that was created. Term is not used at the moment.

An error will be returned if the pengine could not be created:

error(ID, Term)
ID is invalid, since no pengine was created. Term is the exception's error term. */
  252pengine_create(M:Options0) :-
  253    translate_local_sources(Options0, Options, M),
  254    (   select_option(server(BaseURL), Options, RestOptions)
  255    ->  remote_pengine_create(BaseURL, RestOptions)
  256    ;   local_pengine_create(Options)
  257    ).
 translate_local_sources(+OptionsIn, -Options, +Module) is det
Translate the src_predicates and src_list options into src_text. We need to do that anyway for remote pengines. For local pengines, we could avoid this step, but there is very little point in transferring source to a local pengine anyway as local pengines can access any Prolog predicate that you make visible to the application.

Multiple sources are concatenated to end up with a single src_text option.

  271translate_local_sources(OptionsIn, Options, Module) :-
  272    translate_local_sources(OptionsIn, Sources, Options2, Module),
  273    (   Sources == []
  274    ->  Options = Options2
  275    ;   Sources = [Source]
  276    ->  Options = [src_text(Source)|Options2]
  277    ;   atomics_to_string(Sources, Source)
  278    ->  Options = [src_text(Source)|Options2]
  279    ).
  280
  281translate_local_sources([], [], [], _).
  282translate_local_sources([H0|T], [S0|S], Options, M) :-
  283    nonvar(H0),
  284    translate_local_source(H0, S0, M),
  285    !,
  286    translate_local_sources(T, S, Options, M).
  287translate_local_sources([H|T0], S, [H|T], M) :-
  288    translate_local_sources(T0, S, T, M).
  289
  290translate_local_source(src_predicates(PIs), Source, M) :-
  291    must_be(list, PIs),
  292    with_output_to(string(Source),
  293                   maplist(list_in_module(M), PIs)).
  294translate_local_source(src_list(Terms), Source, _) :-
  295    must_be(list, Terms),
  296    with_output_to(string(Source),
  297                   forall(member(Term, Terms),
  298                          format('~k .~n', [Term]))).
  299translate_local_source(src_text(Source), Source, _).
  300
  301list_in_module(M, PI) :-
  302    listing(M:PI).
 pengine_send(+NameOrID, +Term) is det
Same as pengine_send(NameOrID, Term, []). */
  309pengine_send(Target, Event) :-
  310    pengine_send(Target, Event, []).
 pengine_send(+NameOrID, +Term, +Options) is det
Succeeds immediately and places Term in the queue of the pengine NameOrID. Options is a list of options:
delay(+Time)
The actual sending is delayed by Time seconds. Time is an integer or a float.

Any remaining options are passed to http_open/3. */

  325pengine_send(Target, Event, Options) :-
  326    must_be(atom, Target),
  327    pengine_send2(Target, Event, Options).
  328
  329pengine_send2(self, Event, Options) :-
  330    !,
  331    thread_self(Queue),
  332    delay_message(queue(Queue), Event, Options).
  333pengine_send2(Name, Event, Options) :-
  334    child(Name, Target),
  335    !,
  336    delay_message(pengine(Target), Event, Options).
  337pengine_send2(Target, Event, Options) :-
  338    delay_message(pengine(Target), Event, Options).
  339
  340delay_message(Target, Event, Options) :-
  341    option(delay(Delay), Options),
  342    !,
  343    alarm(Delay,
  344          send_message(Target, Event, Options),
  345          _AlarmID,
  346          [remove(true)]).
  347delay_message(Target, Event, Options) :-
  348    random_delay,
  349    send_message(Target, Event, Options).
  350
  351send_message(queue(Queue), Event, _) :-
  352    thread_send_message(Queue, pengine_request(Event)).
  353send_message(pengine(Pengine), Event, Options) :-
  354    (   pengine_remote(Pengine, Server)
  355    ->  remote_pengine_send(Server, Pengine, Event, Options)
  356    ;   pengine_thread(Pengine, Thread)
  357    ->  thread_send_message(Thread, pengine_request(Event))
  358    ;   existence_error(pengine, Pengine)
  359    ).
 pengine_request(-Request) is det
To be used by a pengine to wait for the next request. Such messages are placed in the queue by pengine_send/2. Keeps the thread in normal state if an event arrives within a second. Otherwise it waits for the idle_limit setting while using thread_idle/2 to minimis resources.
  369pengine_request(Request) :-
  370    thread_self(Me),
  371    thread_get_message(Me, pengine_request(Request), [timeout(1)]),
  372    !.
  373pengine_request(Request) :-
  374    pengine_self(Self),
  375    get_pengine_application(Self, Application),
  376    setting(Application:idle_limit, IdleLimit0),
  377    IdleLimit is IdleLimit0-1,
  378    thread_self(Me),
  379    (   thread_idle(thread_get_message(Me, pengine_request(Request),
  380                                       [timeout(IdleLimit)]),
  381                    long)
  382    ->  true
  383    ;   Request = destroy
  384    ).
 pengine_reply(+Event) is det
 pengine_reply(+Queue, +Event) is det
Reply Event to the parent of the current Pengine or the given Queue. Such events are read by the other side with pengine_event/1.

If the message cannot be sent within the idle_limit setting of the pengine, abort the pengine.

  397pengine_reply(Event) :-
  398    pengine_parent(Queue),
  399    pengine_reply(Queue, Event).
  400
  401pengine_reply(_Queue, _Event0) :-
  402    nb_current(pengine_idle_limit_exceeded, true),
  403    !.
  404pengine_reply(Queue, Event0) :-
  405    arg(1, Event0, ID),
  406    wrap_first_answer(ID, Event0, Event),
  407    random_delay,
  408    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  409    (   pengine_self(ID),
  410        \+ pengine_detached(ID, _)
  411    ->  get_pengine_application(ID, Application),
  412        setting(Application:idle_limit, IdleLimit),
  413        debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]),
  414        (   thread_send_message(Queue, pengine_event(ID, Event),
  415                                [ timeout(IdleLimit)
  416                                ])
  417        ->  true
  418        ;   thread_self(Me),
  419            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  420                  [ID, Me]),
  421            nb_setval(pengine_idle_limit_exceeded, true),
  422            thread_detach(Me),
  423            abort
  424        )
  425    ;   thread_send_message(Queue, pengine_event(ID, Event))
  426    ).
  427
  428wrap_first_answer(ID, Event0, CreateEvent) :-
  429    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  430    arg(1, CreateEvent, ID),
  431    !,
  432    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  433wrap_first_answer(_ID, Event, Event).
  434
  435
  436empty_queue :-
  437    pengine_parent(Queue),
  438    empty_queue(Queue, 0, Discarded),
  439    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  440
  441empty_queue(Queue, C0, C) :-
  442    thread_get_message(Queue, _Term, [timeout(0)]),
  443    !,
  444    C1 is C0+1,
  445    empty_queue(Queue, C1, C).
  446empty_queue(_, C, C).
 pengine_ask(+NameOrID, @Query, +Options) is det
Asks pengine NameOrID a query Query.

Options is a list of options:

template(+Template)
Template is a variable (or a term containing variables) shared with the query. By default, the template is identical to the query.
chunk(+IntegerOrFalse)
Retrieve solutions in chunks of Integer rather than one by one. 1 means no chunking (default). Other integers indicate the maximum number of solutions to retrieve in one chunk. If false, the Pengine goal is not executed using findall/3 and friends and we do not backtrack immediately over the goal. As a result, changes to backtrackable global state are retained. This is similar that using set_prolog_flag(toplevel_mode, recursive).
bindings(+Bindings)
Sets the global variable '$variable_names' to a list of Name = Var terms, providing access to the actual variable names.

Any remaining options are passed to pengine_send/3.

Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.

success(ID, Terms, Projection, Time, More)
ID is the id of the pengine that succeeded in solving the query. Terms is a list holding instantiations of Template. Projection is a list of variable names that should be displayed. Time is the CPU time used to produce the results and finally, More is either true or false, indicating whether we can expect the pengine to be able to return more solutions or not, would we call pengine_next/2.
failure(ID)
ID is the id of the pengine that failed for lack of a solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, like so:

pengine_ask(ID, Query, Options) :-
    partition(pengine_ask_option, Options, AskOptions, SendOptions),
    pengine_send(ID, ask(Query, AskOptions), SendOptions).

*/

  515pengine_ask(ID, Query, Options) :-
  516    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  517    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  518
  519
  520pengine_ask_option(template(_)).
  521pengine_ask_option(chunk(_)).
  522pengine_ask_option(bindings(_)).
  523pengine_ask_option(breakpoints(_)).
 pengine_next(+NameOrID, +Options) is det
Asks pengine NameOrID for the next solution to a query started by pengine_ask/3. Defined options are:
chunk(+Count)
Modify the chunk-size to Count before asking the next set of solutions. This may not be used if the goal was started with chunk(false).

Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.

success(ID, Terms, Projection, Time, More)
See pengine_ask/3.
failure(ID)
ID is the id of the pengine that failed for lack of more solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, as follows:

pengine_next(ID, Options) :-
    pengine_send(ID, next, Options).

*/

  568pengine_next(ID, Options) :-
  569    select_option(chunk(Count), Options, Options1),
  570    !,
  571    pengine_send(ID, next(Count), Options1).
  572pengine_next(ID, Options) :-
  573    pengine_send(ID, next, Options).
 pengine_stop(+NameOrID, +Options) is det
Tells pengine NameOrID to stop looking for more solutions to a query started by pengine_ask/3. Options are passed to pengine_send/3.

Defined in terms of pengine_send/3, like so:

pengine_stop(ID, Options) :-
    pengine_send(ID, stop, Options).

*/

  589pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
 pengine_abort(+NameOrID) is det
Aborts the running query. The pengine goes back to state `2', waiting for new queries.
See also
- pengine_destroy/1. */
  600pengine_abort(Name) :-
  601    (   child(Name, Pengine)
  602    ->  true
  603    ;   Pengine = Name
  604    ),
  605    (   pengine_remote(Pengine, Server)
  606    ->  remote_pengine_abort(Server, Pengine, [])
  607    ;   pengine_thread(Pengine, Thread),
  608        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  609        catch(thread_signal(Thread, throw(abort_query)), _, true)
  610    ).
 pengine_destroy(+NameOrID) is det
 pengine_destroy(+NameOrID, +Options) is det
Destroys the pengine NameOrID. With the option force(true), the pengine is killed using abort/0 and pengine_destroy/2 succeeds. */
  620pengine_destroy(ID) :-
  621    pengine_destroy(ID, []).
  622
  623pengine_destroy(Name, Options) :-
  624    (   child(Name, ID)
  625    ->  true
  626    ;   ID = Name
  627    ),
  628    option(force(true), Options),
  629    !,
  630    (   pengine_thread(ID, Thread)
  631    ->  catch(thread_signal(Thread, abort),
  632              error(existence_error(thread, _), _), true)
  633    ;   true
  634    ).
  635pengine_destroy(ID, _) :-
  636    catch(pengine_send(ID, destroy),
  637          error(existence_error(pengine, ID), _),
  638          retractall(child(_,ID))).
  639
  640
  641/*================= pengines administration =======================
  642*/
 current_pengine(?Id, ?Parent, ?Location)
Dynamic predicate that registers our known pengines. Id is an atomic unique datatype. Parent is the id of our parent pengine. Location is one of
  653:- dynamic
  654    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  655    pengine_queue/4,                % Id, Queue, TimeOut, Time
  656    output_queue/3,                 % Id, Queue, Time
  657    pengine_user/2,                 % Id, User
  658    pengine_data/2,                 % Id, Data
  659    pengine_detached/2.             % Id, Data
  660:- volatile
  661    current_pengine/6,
  662    pengine_queue/4,
  663    output_queue/3,
  664    pengine_user/2,
  665    pengine_data/2,
  666    pengine_detached/2.  667
  668:- thread_local
  669    child/2.                        % ?Name, ?Child
 pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det
 pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det
 pengine_unregister(+Id) is det
  675pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  676    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  677
  678pengine_register_remote(Id, URL, Application, Destroy) :-
  679    thread_self(Queue),
  680    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
 pengine_unregister(+Id)
Called by the pengine thread destruction. If we are a remote pengine thread, our URL equals http and the queue is the message queue used to send events to the HTTP workers.
  688pengine_unregister(Id) :-
  689    thread_self(Me),
  690    (   current_pengine(Id, Queue, Me, http, _, _)
  691    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  692    ;   true
  693    ),
  694    retractall(current_pengine(Id, _, Me, _, _, _)),
  695    retractall(pengine_user(Id, _)),
  696    retractall(pengine_data(Id, _)).
  697
  698pengine_unregister_remote(Id) :-
  699    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
 pengine_self(-Id) is det
True if the current thread is a pengine with Id.
  705pengine_self(Id) :-
  706    thread_self(Thread),
  707    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  708
  709pengine_parent(Parent) :-
  710    nb_getval(pengine_parent, Parent).
  711
  712pengine_thread(Pengine, Thread) :-
  713    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  714    Thread \== 0,
  715    !.
  716
  717pengine_remote(Pengine, URL) :-
  718    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  719
  720get_pengine_application(Pengine, Application) :-
  721    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  722    !.
  723
  724get_pengine_module(Pengine, Pengine).
  725
  726:- if(current_predicate(uuid/2)).  727pengine_uuid(Id) :-
  728    uuid(Id, [version(4)]).             % Version 4 is random.
  729:- else.  730pengine_uuid(Id) :-
  731    (   current_prolog_flag(max_integer, Max1)
  732    ->  Max is Max1-1
  733    ;   Max is 1<<128
  734    ),
  735    random_between(0, Max, Num),
  736    atom_number(Id, Num).
  737:- endif.
 protect_pengine(+Id, :Goal) is semidet
Run Goal while protecting the Pengine Id from being destroyed. Used by the HTTP I/O routines to avoid that the Pengine's module disappears while I/O is in progress. We use a pool of locks because the lock may be held relatively long by output routines.

This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.

bug
- After destroy_or_continue/1 takes the destroy route, the module may drop-out at any point in time, resulting in a possible crash. Seems the only safe way out is to do (de)serialization inside the Pengine.
  754:- meta_predicate protect_pengine(+, 0).  755
  756protect_pengine(Id, Goal) :-
  757    term_hash(Id, Hash),
  758    LockN is Hash mod 64,
  759    atom_concat(pengine_done_, LockN, Lock),
  760    with_mutex(Lock,
  761               (   pengine_thread(Id, _)
  762               ->  Goal
  763               ;   Goal
  764               )).
 pengine_application(+Application) is det
Directive that must be used to declare a pengine application module. The module must not be associated to any file. The default application is pengine_sandbox. The example below creates a new application address_book and imports the API defined in the module file adress_book_api.pl into the application.
:- pengine_application(address_book).
:- use_module(address_book:adress_book_api).

*/

  781pengine_application(Application) :-
  782    throw(error(context_error(nodirective,
  783                             pengine_application(Application)), _)).
  784
  785:- multifile
  786    system:term_expansion/2,
  787    current_application/1.
 current_pengine_application(?Application) is nondet
True when Application is a currently defined application.
See also
- pengine_application/1
  795current_pengine_application(Application) :-
  796    current_application(Application).
  797
  798
  799% Default settings for all applications
  800
  801:- setting(thread_pool_size, integer, 100,
  802           'Maximum number of pengines this application can run.').  803:- setting(thread_pool_stacks, list(compound), [],
  804           'Maximum stack sizes for pengines this application can run.').  805:- setting(slave_limit, integer, 3,
  806           'Maximum number of slave pengines a master pengine can create.').  807:- setting(time_limit, number, 300,
  808           'Maximum time to wait for output').  809:- setting(idle_limit, number, 300,
  810           'Pengine auto-destroys when idle for this time').  811:- setting(safe_goal_limit, number, 10,
  812           'Maximum time to try proving safety of the goal').  813:- setting(program_space, integer, 100_000_000,
  814           'Maximum memory used by predicates').  815:- setting(allow_from, list(atom), [*],
  816           'IP addresses from which remotes are allowed to connect').  817:- setting(deny_from, list(atom), [],
  818           'IP addresses from which remotes are NOT allowed to connect').  819:- setting(debug_info, boolean, false,
  820           'Keep information to support source-level debugging').  821
  822
  823system:term_expansion((:- pengine_application(Application)), Expanded) :-
  824    must_be(atom, Application),
  825    (   module_property(Application, file(_))
  826    ->  permission_error(create, pengine_application, Application)
  827    ;   true
  828    ),
  829    expand_term((:- setting(Application:thread_pool_size, integer,
  830                            setting(pengines:thread_pool_size),
  831                            'Maximum number of pengines this \c
  832                            application can run.')),
  833                ThreadPoolSizeSetting),
  834    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  835                            setting(pengines:thread_pool_stacks),
  836                            'Maximum stack sizes for pengines \c
  837                            this application can run.')),
  838                ThreadPoolStacksSetting),
  839    expand_term((:- setting(Application:slave_limit, integer,
  840                            setting(pengines:slave_limit),
  841                            'Maximum number of local slave pengines \c
  842                            a master pengine can create.')),
  843                SlaveLimitSetting),
  844    expand_term((:- setting(Application:time_limit, number,
  845                            setting(pengines:time_limit),
  846                            'Maximum time to wait for output')),
  847                TimeLimitSetting),
  848    expand_term((:- setting(Application:idle_limit, number,
  849                            setting(pengines:idle_limit),
  850                            'Pengine auto-destroys when idle for this time')),
  851                IdleLimitSetting),
  852    expand_term((:- setting(Application:safe_goal_limit, number,
  853                            setting(pengines:safe_goal_limit),
  854                            'Maximum time to try proving safety of the goal')),
  855                SafeGoalLimitSetting),
  856    expand_term((:- setting(Application:program_space, integer,
  857                            setting(pengines:program_space),
  858                            'Maximum memory used by predicates')),
  859                ProgramSpaceSetting),
  860    expand_term((:- setting(Application:allow_from, list(atom),
  861                            setting(pengines:allow_from),
  862                            'IP addresses from which remotes are allowed \c
  863                            to connect')),
  864                AllowFromSetting),
  865    expand_term((:- setting(Application:deny_from, list(atom),
  866                            setting(pengines:deny_from),
  867                            'IP addresses from which remotes are NOT \c
  868                            allowed to connect')),
  869                DenyFromSetting),
  870    expand_term((:- setting(Application:debug_info, boolean,
  871                            setting(pengines:debug_info),
  872                            'Keep information to support source-level \c
  873                            debugging')),
  874                DebugInfoSetting),
  875    flatten([ pengines:current_application(Application),
  876              ThreadPoolSizeSetting,
  877              ThreadPoolStacksSetting,
  878              SlaveLimitSetting,
  879              TimeLimitSetting,
  880              IdleLimitSetting,
  881              SafeGoalLimitSetting,
  882              ProgramSpaceSetting,
  883              AllowFromSetting,
  884              DenyFromSetting,
  885              DebugInfoSetting
  886            ], Expanded).
  887
  888% Register default application
  889
  890:- pengine_application(pengine_sandbox).
 pengine_property(?Pengine, ?Property) is nondet
True when Property is a property of the given Pengine. Enumerates all pengines that are known to the calling Prolog process. Defined properties are:
self(ID)
Identifier of the pengine. This is the same as the first argument, and can be used to enumerate all known pengines.
alias(Name)
Name is the alias name of the pengine, as provided through the alias option when creating the pengine.
thread(Thread)
If the pengine is a local pengine, Thread is the Prolog thread identifier of the pengine.
remote(Server)
If the pengine is remote, the URL of the server.
application(Application)
Pengine runs the given application
module(Module)
Temporary module used for running the Pengine.
destroy(Destroy)
Destroy is true if the pengines is destroyed automatically after completing the query.
parent(Queue)
Message queue to which the (local) pengine reports.
source(?SourceID, ?Source)
Source is the source code with the given SourceID. May be present if the setting debug_info is present.
detached(?Time)
Pengine was detached at Time. */
  927pengine_property(Id, Prop) :-
  928    nonvar(Id), nonvar(Prop),
  929    pengine_property2(Prop, Id),
  930    !.
  931pengine_property(Id, Prop) :-
  932    pengine_property2(Prop, Id).
  933
  934pengine_property2(self(Id), Id) :-
  935    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  936pengine_property2(module(Id), Id) :-
  937    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  938pengine_property2(alias(Alias), Id) :-
  939    child(Alias, Id),
  940    Alias \== Id.
  941pengine_property2(thread(Thread), Id) :-
  942    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  943    Thread \== 0.
  944pengine_property2(remote(Server), Id) :-
  945    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  946pengine_property2(application(Application), Id) :-
  947    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  948pengine_property2(destroy(Destroy), Id) :-
  949    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  950pengine_property2(parent(Parent), Id) :-
  951    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  952pengine_property2(source(SourceID, Source), Id) :-
  953    pengine_data(Id, source(SourceID, Source)).
  954pengine_property2(detached(When), Id) :-
  955    pengine_detached(Id, When).
 pengine_output(+Term) is det
Sends Term to the parent pengine or thread. */
  962pengine_output(Term) :-
  963    pengine_self(Me),
  964    pengine_reply(output(Me, Term)).
 pengine_debug(+Format, +Args) is det
Create a message using format/3 from Format and Args and send this to the client. The default JavaScript client will call console.log(Message) if there is a console. The predicate pengine_rpc/3 calls debug(pengine(debug), '~w', [Message]). The debug topic pengine(debug) is enabled by default.
See also
- debug/1 and nodebug/1 for controlling the pengine(debug) topic
- format/2 for format specifications */
  979pengine_debug(Format, Args) :-
  980    pengine_parent(Queue),
  981    pengine_self(Self),
  982    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  983    (   var(E)
  984    ->  format(atom(Message), Format, Args)
  985    ;   message_to_string(E, Message)
  986    ),
  987    pengine_reply(Queue, debug(Self, Message)).
  988
  989
  990/*================= Local pengine =======================
  991*/
 local_pengine_create(+Options)
Creates a local Pengine, which is a thread running pengine_main/2. It maintains two predicates:
 1002local_pengine_create(Options) :-
 1003    thread_self(Self),
 1004    option(application(Application), Options, pengine_sandbox),
 1005    create(Self, Child, Options, local, Application),
 1006    option(alias(Name), Options, Child),
 1007    assert(child(Name, Child)).
 thread_pool:create_pool(+Application) is det
On demand creation of a thread pool for a pengine application.
 1014:- multifile thread_pool:create_pool/1. 1015
 1016thread_pool:create_pool(Application) :-
 1017    current_application(Application),
 1018    setting(Application:thread_pool_size, Size),
 1019    setting(Application:thread_pool_stacks, Stacks),
 1020    thread_pool_create(Application, Size, Stacks).
 create(+Queue, -Child, +Options, +URL, +Application) is det
Create a new pengine thread.
Arguments:
Queue- is the queue (or thread handle) to report to
Child- is the identifier of the created pengine.
URL- is one of local or http
 1030create(Queue, Child, Options, local, Application) :-
 1031    !,
 1032    pengine_child_id(Child),
 1033    create0(Queue, Child, Options, local, Application).
 1034create(Queue, Child, Options, URL, Application) :-
 1035    pengine_child_id(Child),
 1036    catch(create0(Queue, Child, Options, URL, Application),
 1037          Error,
 1038          create_error(Queue, Child, Error)).
 1039
 1040pengine_child_id(Child) :-
 1041    (   nonvar(Child)
 1042    ->  true
 1043    ;   pengine_uuid(Child)
 1044    ).
 1045
 1046create_error(Queue, Child, Error) :-
 1047    pengine_reply(Queue, error(Child, Error)).
 1048
 1049create0(Queue, Child, Options, URL, Application) :-
 1050    (  current_application(Application)
 1051    -> true
 1052    ;  existence_error(pengine_application, Application)
 1053    ),
 1054    (   URL \== http                    % pengine is _not_ a child of the
 1055                                        % HTTP server thread
 1056    ->  aggregate_all(count, child(_,_), Count),
 1057        setting(Application:slave_limit, Max),
 1058        (   Count >= Max
 1059        ->  throw(error(resource_error(max_pengines), _))
 1060        ;   true
 1061        )
 1062    ;   true
 1063    ),
 1064    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 1065    thread_create_in_pool(
 1066        Application,
 1067        pengine_main(Queue, PengineOptions, Application), ChildThread,
 1068        [ at_exit(pengine_done)
 1069        | RestOptions
 1070        ]),
 1071    option(destroy(Destroy), PengineOptions, true),
 1072    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1073    thread_send_message(ChildThread, pengine_registered(Child)),
 1074    (   option(id(Id), Options)
 1075    ->  Id = Child
 1076    ;   true
 1077    ).
 1078
 1079pengine_create_option(src_text(_)).
 1080pengine_create_option(src_url(_)).
 1081pengine_create_option(application(_)).
 1082pengine_create_option(destroy(_)).
 1083pengine_create_option(ask(_)).
 1084pengine_create_option(template(_)).
 1085pengine_create_option(bindings(_)).
 1086pengine_create_option(chunk(_)).
 1087pengine_create_option(alias(_)).
 1088pengine_create_option(user(_)).
 pengine_done is det
Called from the pengine thread at_exit option. Destroys child pengines using pengine_destroy/1. Cleaning up the Pengine is synchronised by the pengine_done mutex. See read_event/6.
 1097:- public
 1098    pengine_done/0. 1099
 1100pengine_done :-
 1101    thread_self(Me),
 1102    (   thread_property(Me, status(exception('$aborted'))),
 1103        thread_detach(Me),
 1104        pengine_self(Pengine)
 1105    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1106              error(_,_), true)
 1107    ;   true
 1108    ),
 1109    forall(child(_Name, Child),
 1110           pengine_destroy(Child)),
 1111    pengine_self(Id),
 1112    protect_pengine(Id, pengine_unregister(Id)).
 pengine_main(+Parent, +Options, +Application)
Run a pengine main loop. First acknowledges its creation and run pengine_main_loop/1.
 1120:- thread_local wrap_first_answer_in_create_event/2. 1121
 1122:- meta_predicate
 1123    pengine_prepare_source(:, +). 1124
 1125pengine_main(Parent, Options, Application) :-
 1126    fix_streams,
 1127    thread_get_message(pengine_registered(Self)),
 1128    nb_setval(pengine_parent, Parent),
 1129    pengine_register_user(Options),
 1130    set_prolog_flag(mitigate_spectre, true),
 1131    catch(in_temporary_module(
 1132              Self,
 1133              pengine_prepare_source(Application, Options),
 1134              pengine_create_and_loop(Self, Application, Options)),
 1135          prepare_source_failed,
 1136          pengine_terminate(Self)).
 1137
 1138pengine_create_and_loop(Self, Application, Options) :-
 1139    setting(Application:slave_limit, SlaveLimit),
 1140    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1141    (   option(ask(Query0), Options)
 1142    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1143        (   string(Query0)                      % string is not callable
 1144        ->  (   option(template(TemplateS), Options)
 1145            ->  Ask2 = Query0-TemplateS
 1146            ;   Ask2 = Query0
 1147            ),
 1148            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1149                  Error, true),
 1150            (   var(Error)
 1151            ->  true
 1152            ;   send_error(Error),
 1153                throw(prepare_source_failed)
 1154            )
 1155        ;   Query = Query0,
 1156            option(template(Template), Options, Query),
 1157            option(bindings(Bindings), Options, [])
 1158        ),
 1159        option(chunk(Chunk), Options, 1),
 1160        pengine_ask(Self, Query,
 1161                    [ template(Template),
 1162                      chunk(Chunk),
 1163                      bindings(Bindings)
 1164                    ])
 1165    ;   Extra = [],
 1166        pengine_reply(CreateEvent)
 1167    ),
 1168    pengine_main_loop(Self).
 ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det
Translate the AskSpec into a query, template and bindings. The trick is that we must parse using the operator declarations of the source and we must make sure variable sharing between query and answer template are known.
 1178ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1179    !,
 1180    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1181    term_string(t(Template1,Ask1), AskTemplate,
 1182                [ variable_names(Bindings0),
 1183                  module(Module)
 1184                ]),
 1185    phrase(template_bindings(Template1, Bindings0), Bindings).
 1186ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1187    term_string(Ask1, Ask,
 1188                [ variable_names(Bindings),
 1189                  module(Module)
 1190                ]),
 1191    exclude(anon, Bindings, Bindings1),
 1192    dict_create(Template, swish_default_template, Bindings1).
 1193
 1194template_bindings(Var, Bindings) -->
 1195    { var(Var) }, !,
 1196    (   { var_binding(Bindings, Var, Binding)
 1197        }
 1198    ->  [Binding]
 1199    ;   []
 1200    ).
 1201template_bindings([H|T], Bindings) -->
 1202    !,
 1203    template_bindings(H, Bindings),
 1204    template_bindings(T, Bindings).
 1205template_bindings(Compoound, Bindings) -->
 1206    { compound(Compoound), !,
 1207      compound_name_arguments(Compoound, _, Args)
 1208    },
 1209    template_bindings(Args, Bindings).
 1210template_bindings(_, _) --> [].
 1211
 1212var_binding(Bindings, Var, Binding) :-
 1213    member(Binding, Bindings),
 1214    arg(2, Binding, V),
 1215    V == Var, !.
 fix_streams is det
If we are a pengine that is created from a web server thread, the current output points to a CGI stream.
 1222fix_streams :-
 1223    fix_stream(current_output).
 1224
 1225fix_stream(Name) :-
 1226    is_cgi_stream(Name),
 1227    !,
 1228    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1229    set_stream(user_output, alias(Name)).
 1230fix_stream(_).
 pengine_prepare_source(:Application, +Options) is det
Load the source into the pengine's module.
throws
- prepare_source_failed if it failed to prepare the sources.
 1239pengine_prepare_source(Module:Application, Options) :-
 1240    setting(Application:program_space, SpaceLimit),
 1241    set_module(Module:program_space(SpaceLimit)),
 1242    delete_import_module(Module, user),
 1243    add_import_module(Module, Application, start),
 1244    catch(prep_module(Module, Application, Options), Error, true),
 1245    (   var(Error)
 1246    ->  true
 1247    ;   send_error(Error),
 1248        throw(prepare_source_failed)
 1249    ).
 1250
 1251prep_module(Module, Application, Options) :-
 1252    maplist(copy_flag(Module, Application), [var_prefix]),
 1253    forall(prepare_module(Module, Application, Options), true),
 1254    setup_call_cleanup(
 1255        '$set_source_module'(OldModule, Module),
 1256        maplist(process_create_option(Module), Options),
 1257        '$set_source_module'(OldModule)).
 1258
 1259copy_flag(Module, Application, Flag) :-
 1260    current_prolog_flag(Application:Flag, Value),
 1261    !,
 1262    set_prolog_flag(Module:Flag, Value).
 1263copy_flag(_, _, _).
 1264
 1265process_create_option(Application, src_text(Text)) :-
 1266    !,
 1267    pengine_src_text(Text, Application).
 1268process_create_option(Application, src_url(URL)) :-
 1269    !,
 1270    pengine_src_url(URL, Application).
 1271process_create_option(_, _).
 prepare_module(+Module, +Application, +Options) is semidet
Hook, called to initialize the temporary private module that provides the working context of a pengine. This hook is executed by the pengine's thread. Preparing the source consists of three steps:
  1. Add Application as (first) default import module for Module
  2. Call this hook
  3. Compile the source provided by the the src_text and src_url options
Arguments:
Module- is a new temporary module (see in_temporary_module/3) that may be (further) prepared by this hook.
Application- (also a module) associated to the pengine.
Options- is passed from the environment and should (currently) be ignored.
 1294pengine_main_loop(ID) :-
 1295    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1296
 1297pengine_aborted(ID) :-
 1298    thread_self(Self),
 1299    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1300    empty_queue,
 1301    destroy_or_continue(abort(ID)).
 guarded_main_loop(+Pengine) is det
Executes state `2' of the pengine, where it waits for two events:
destroy
Terminate the pengine
ask(:Goal, +Options)
Solve Goal.
 1314guarded_main_loop(ID) :-
 1315    pengine_request(Request),
 1316    (   Request = destroy
 1317    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1318        pengine_terminate(ID)
 1319    ;   Request = ask(Goal, Options)
 1320    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1321        ask(ID, Goal, Options)
 1322    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1323        pengine_reply(error(ID, error(protocol_error, _))),
 1324        guarded_main_loop(ID)
 1325    ).
 1326
 1327
 1328pengine_terminate(ID) :-
 1329    pengine_reply(destroy(ID)),
 1330    thread_self(Me),            % Make the thread silently disappear
 1331    thread_detach(Me).
 solve(+Chunk, +Template, :Goal, +ID) is det
Solve Goal. Note that because we can ask for a new goal in state `6', we must provide for an ancesteral cut (prolog_cut_to/1). We need to be sure to have a choice point before we can call prolog_current_choice/1. This is the reason why this predicate has two clauses.
 1342solve(Chunk, Template, Goal, ID) :-
 1343    prolog_current_choice(Choice),
 1344    (   integer(Chunk)
 1345    ->  State = count(Chunk)
 1346    ;   Chunk == false
 1347    ->  State = no_chunk
 1348    ;   domain_error(chunk, Chunk)
 1349    ),
 1350    statistics(cputime, Epoch),
 1351    Time = time(Epoch),
 1352    nb_current('$variable_names', Bindings),
 1353    filter_template(Template, Bindings, Template2),
 1354    '$current_typein_module'(CurrTypeIn),
 1355    (   '$set_typein_module'(ID),
 1356        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1357                                              set_projection(Goal, Bindings),
 1358                                              Result),
 1359                           Error, true),
 1360                     query_done(Det, CurrTypeIn)),
 1361        arg(1, Time, T0),
 1362        statistics(cputime, T1),
 1363        CPUTime is T1-T0,
 1364        (   var(Error)
 1365        ->  projection(Projection),
 1366            (   var(Det)
 1367            ->  pengine_reply(success(ID, Result, Projection,
 1368                                      CPUTime, true)),
 1369                more_solutions(ID, Choice, State, Time)
 1370            ;   !,                      % commit
 1371                destroy_or_continue(success(ID, Result, Projection,
 1372                                            CPUTime, false))
 1373            )
 1374        ;   !,                          % commit
 1375            (   Error == abort_query
 1376            ->  throw(Error)
 1377            ;   destroy_or_continue(error(ID, Error))
 1378            )
 1379        )
 1380    ;   !,                              % commit
 1381        arg(1, Time, T0),
 1382        statistics(cputime, T1),
 1383        CPUTime is T1-T0,
 1384        destroy_or_continue(failure(ID, CPUTime))
 1385    ).
 1386solve(_, _, _, _).                      % leave a choice point
 1387
 1388query_done(true, CurrTypeIn) :-
 1389    '$set_typein_module'(CurrTypeIn).
 set_projection(:Goal, +Bindings)
findnsols_no_empty/4 copies its goal and template to avoid instantiation thereof when it stops after finding N solutions. Using this helper we can a renamed version of Bindings that we can set.
 1398set_projection(Goal, Bindings) :-
 1399    b_setval('$variable_names', Bindings),
 1400    call(Goal).
 1401
 1402projection(Projection) :-
 1403    nb_current('$variable_names', Bindings),
 1404    !,
 1405    maplist(var_name, Bindings, Projection).
 1406projection([]).
 filter_template(+Template0, +Bindings, -Template) is det
Establish the final template. This is there because hooks such as goal_expansion/2 and the SWISH query hooks can modify the set of bindings.
bug
- Projection and template handling is pretty messy.
 1416filter_template(Template0, Bindings, Template) :-
 1417    is_dict(Template0, swish_default_template),
 1418    !,
 1419    dict_create(Template, swish_default_template, Bindings).
 1420filter_template(Template, _Bindings, Template).
 1421
 1422findnsols_no_empty(no_chunk, Template, Goal, List) =>
 1423    List = [Template],
 1424    call(Goal).
 1425findnsols_no_empty(State, Template, Goal, List) =>
 1426    findnsols(State, Template, Goal, List),
 1427    List \== [].
 1428
 1429destroy_or_continue(Event) :-
 1430    arg(1, Event, ID),
 1431    (   pengine_property(ID, destroy(true))
 1432    ->  thread_self(Me),
 1433        thread_detach(Me),
 1434        pengine_reply(destroy(ID, Event))
 1435    ;   pengine_reply(Event),
 1436        guarded_main_loop(ID)
 1437    ).
 more_solutions(+Pengine, +Choice, +State, +Time)
Called after a solution was found while there can be more. This is state `6' of the state machine. It processes these events:
stop
Go back via state `7' to state `2' (guarded_main_loop/1)
next
Fail. This causes solve/3 to backtrack on the goal asked, providing at most the current chunk solutions.
next(Count)
As next, but sets the new chunk-size to Count.
ask(Goal, Options)
Ask another goal. Note that we must commit the choice point of the previous goal asked for.
 1455more_solutions(ID, Choice, State, Time) :-
 1456    pengine_request(Event),
 1457    more_solutions(Event, ID, Choice, State, Time).
 1458
 1459more_solutions(stop, ID, _Choice, _State, _Time) :-
 1460    !,
 1461    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1462    destroy_or_continue(stop(ID)).
 1463more_solutions(next, ID, _Choice, _State, Time) :-
 1464    !,
 1465    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1466    statistics(cputime, T0),
 1467    nb_setarg(1, Time, T0),
 1468    fail.
 1469more_solutions(next(Count), ID, _Choice, State, Time) :-
 1470    Count > 0,
 1471    State = count(_),                   % else fallthrough to protocol error
 1472    !,
 1473    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1474    nb_setarg(1, State, Count),
 1475    statistics(cputime, T0),
 1476    nb_setarg(1, Time, T0),
 1477    fail.
 1478more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1479    !,
 1480    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1481    prolog_cut_to(Choice),
 1482    ask(ID, Goal, Options).
 1483more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1484    !,
 1485    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1486    pengine_terminate(ID).
 1487more_solutions(Event, ID, Choice, State, Time) :-
 1488    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1489    pengine_reply(error(ID, error(protocol_error, _))),
 1490    more_solutions(ID, Choice, State, Time).
 ask(+Pengine, :Goal, +Options)
Migrate from state `2' to `3'. This predicate validates that it is safe to call Goal using safe_goal/1 and then calls solve/3 to prove the goal. It takes care of the chunk(N) option.
 1498ask(ID, Goal, Options) :-
 1499    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1500    !,
 1501    (   var(Error)
 1502    ->  option(template(Template), Options, Goal),
 1503        option(chunk(N), Options, 1),
 1504        solve(N, Template, Goal1, ID)
 1505    ;   pengine_reply(error(ID, Error)),
 1506        guarded_main_loop(ID)
 1507    ).
 prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det
Prepare GoalIn for execution in Pengine. This implies we must perform goal expansion and, if the system is sandboxed, check the sandbox.

Note that expand_goal(Module:GoalIn, GoalOut) is what we'd like to write, but this does not work correctly if the user wishes to expand X:Y while interpreting X not as the module in which to run Y. This happens in the CQL package. Possibly we should disallow this reinterpretation?

 1521prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1522    option(bindings(Bindings), Options, []),
 1523    b_setval('$variable_names', Bindings),
 1524    (   prepare_goal(Goal0, Goal1, Options)
 1525    ->  true
 1526    ;   Goal1 = Goal0
 1527    ),
 1528    get_pengine_module(ID, Module),
 1529    setup_call_cleanup(
 1530        '$set_source_module'(Old, Module),
 1531        expand_goal(Goal1, Goal),
 1532        '$set_source_module'(_, Old)),
 1533    (   pengine_not_sandboxed(ID)
 1534    ->  true
 1535    ;   get_pengine_application(ID, App),
 1536        setting(App:safe_goal_limit, Limit),
 1537        catch(call_with_time_limit(
 1538                  Limit,
 1539                  safe_goal(Module:Goal)), E, true)
 1540    ->  (   var(E)
 1541        ->  true
 1542        ;   E = time_limit_exceeded
 1543        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1544        ;   throw(E)
 1545        )
 1546    ).
 prepare_goal(+Goal0, -Goal1, +Options) is semidet
Pre-preparation hook for running Goal0. The hook runs in the context of the pengine. Goal is the raw goal given to ask. The returned Goal1 is subject to goal expansion (expand_goal/2) and sandbox validation (safe_goal/1) prior to execution. If this goal fails, Goal0 is used for further processing.
Arguments:
Options- provides the options as given to ask
 pengine_not_sandboxed(+Pengine) is semidet
True when pengine does not operate in sandboxed mode. This implies a user must be registered by authentication_hook/3 and the hook pengines:not_sandboxed(User, Application) must succeed.
 1566pengine_not_sandboxed(ID) :-
 1567    pengine_user(ID, User),
 1568    pengine_property(ID, application(App)),
 1569    not_sandboxed(User, App),
 1570    !.
 not_sandboxed(+User, +Application) is semidet
This hook is called to see whether the Pengine must be executed in a protected environment. It is only called after authentication_hook/3 has confirmed the authentity of the current user. If this hook succeeds, both loading the code and executing the query is executed without enforcing sandbox security. Typically, one should:
  1. Provide a safe user authentication hook.
  2. Enable HTTPS in the server or put it behind an HTTPS proxy and ensure that the network between the proxy and the pengine server can be trusted.
 pengine_pull_response(+Pengine, +Options) is det
Pulls a response (an event term) from the slave Pengine if Pengine is a remote process, else does nothing at all. */
 1592pengine_pull_response(Pengine, Options) :-
 1593    pengine_remote(Pengine, Server),
 1594    !,
 1595    remote_pengine_pull_response(Server, Pengine, Options).
 1596pengine_pull_response(_ID, _Options).
 pengine_input(+Prompt, -Term) is det
Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be any term, compound as well as atomic. */
 1605pengine_input(Prompt, Term) :-
 1606    pengine_self(Self),
 1607    pengine_parent(Parent),
 1608    pengine_reply(Parent, prompt(Self, Prompt)),
 1609    pengine_request(Request),
 1610    (   Request = input(Input)
 1611    ->  Term = Input
 1612    ;   Request == destroy
 1613    ->  abort
 1614    ;   throw(error(protocol_error,_))
 1615    ).
 pengine_respond(+Pengine, +Input, +Options) is det
Sends a response in the form of the term Input to a slave (child) pengine that has prompted its master (parent) for input.

Defined in terms of pengine_send/3, as follows:

pengine_respond(Pengine, Input, Options) :-
    pengine_send(Pengine, input(Input), Options).

*/

 1632pengine_respond(Pengine, Input, Options) :-
 1633    pengine_send(Pengine, input(Input), Options).
 send_error(+Error) is det
Send an error to my parent. Remove non-readable blobs from the error term first using replace_blobs/2. If the error contains a stack-trace, this is resolved to a string before sending.
 1642send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1643    is_list(Frames),
 1644    !,
 1645    with_output_to(string(Stack),
 1646                   print_prolog_backtrace(current_output, Frames)),
 1647    pengine_self(Self),
 1648    replace_blobs(Formal, Formal1),
 1649    replace_blobs(Message, Message1),
 1650    pengine_reply(error(Self, error(Formal1,
 1651                                    context(prolog_stack(Stack), Message1)))).
 1652send_error(Error) :-
 1653    pengine_self(Self),
 1654    replace_blobs(Error, Error1),
 1655    pengine_reply(error(Self, Error1)).
 replace_blobs(Term0, Term) is det
Copy Term0 to Term, replacing non-text blobs. This is required for error messages that may hold streams and other handles to non-readable objects.
 1663replace_blobs(Blob, Atom) :-
 1664    blob(Blob, Type), Type \== text,
 1665    !,
 1666    format(atom(Atom), '~p', [Blob]).
 1667replace_blobs(Term0, Term) :-
 1668    compound(Term0),
 1669    !,
 1670    compound_name_arguments(Term0, Name, Args0),
 1671    maplist(replace_blobs, Args0, Args),
 1672    compound_name_arguments(Term, Name, Args).
 1673replace_blobs(Term, Term).
 1674
 1675
 1676/*================= Remote pengines =======================
 1677*/
 1678
 1679
 1680remote_pengine_create(BaseURL, Options) :-
 1681    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1682        (       option(ask(Query), PengineOptions0),
 1683                \+ option(template(_Template), PengineOptions0)
 1684        ->      PengineOptions = [template(Query)|PengineOptions0]
 1685        ;       PengineOptions = PengineOptions0
 1686        ),
 1687    options_to_dict(PengineOptions, PostData),
 1688    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1689    arg(1, Reply, ID),
 1690    (   option(id(ID2), Options)
 1691    ->  ID = ID2
 1692    ;   true
 1693    ),
 1694    option(alias(Name), Options, ID),
 1695    assert(child(Name, ID)),
 1696    (   (   functor(Reply, create, _)   % actually created
 1697        ;   functor(Reply, output, _)   % compiler messages
 1698        )
 1699    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1700        option(destroy(Destroy), PengineOptions, true),
 1701        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1702    ;   true
 1703    ),
 1704    thread_self(Queue),
 1705    pengine_reply(Queue, Reply).
 1706
 1707options_to_dict(Options, Dict) :-
 1708    select_option(ask(Ask), Options, Options1),
 1709    select_option(template(Template), Options1, Options2),
 1710    !,
 1711    no_numbered_var_in(Ask+Template),
 1712    findall(AskString-TemplateString,
 1713            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1714            [ AskString-TemplateString ]),
 1715    options_to_dict(Options2, Dict0),
 1716    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1717options_to_dict(Options, Dict) :-
 1718    maplist(prolog_option, Options, Options1),
 1719    dict_create(Dict, _, Options1).
 1720
 1721no_numbered_var_in(Term) :-
 1722    sub_term(Sub, Term),
 1723    subsumes_term('$VAR'(_), Sub),
 1724    !,
 1725    domain_error(numbered_vars_free_term, Term).
 1726no_numbered_var_in(_).
 1727
 1728ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1729    numbervars(Ask+Template, 0, _),
 1730    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1731    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1732                                            Template, WOpts
 1733                                          ]),
 1734    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1735
 1736prolog_option(Option0, Option) :-
 1737    create_option_type(Option0, term),
 1738    !,
 1739    Option0 =.. [Name,Value],
 1740    format(string(String), '~k', [Value]),
 1741    Option =.. [Name,String].
 1742prolog_option(Option, Option).
 1743
 1744create_option_type(ask(_),         term).
 1745create_option_type(template(_),    term).
 1746create_option_type(application(_), atom).
 1747
 1748remote_pengine_send(BaseURL, ID, Event, Options) :-
 1749    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1750    thread_self(Queue),
 1751    pengine_reply(Queue, Reply).
 1752
 1753remote_pengine_pull_response(BaseURL, ID, Options) :-
 1754    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1755    thread_self(Queue),
 1756    pengine_reply(Queue, Reply).
 1757
 1758remote_pengine_abort(BaseURL, ID, Options) :-
 1759    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1760    thread_self(Queue),
 1761    pengine_reply(Queue, Reply).
 remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
Issue a GET request on Server and unify Reply with the replied term.
 1768remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1769    !,
 1770    server_url(Server, Action, [id=ID], URL),
 1771    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1772              [ post(prolog(Event))     % makes it impossible to interrupt.
 1773              | Options
 1774              ]),
 1775    call_cleanup(
 1776        read_prolog_reply(Stream, Reply),
 1777        close(Stream)).
 1778remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1779    server_url(Server, Action, [id=ID|Params], URL),
 1780    http_open(URL, Stream, Options),
 1781    call_cleanup(
 1782        read_prolog_reply(Stream, Reply),
 1783        close(Stream)).
 1784
 1785remote_post_rec(Server, Action, Data, Reply, Options) :-
 1786    server_url(Server, Action, [], URL),
 1787    probe(Action, URL),
 1788    http_open(URL, Stream,
 1789              [ post(json(Data))
 1790              | Options
 1791              ]),
 1792    call_cleanup(
 1793        read_prolog_reply(Stream, Reply),
 1794        close(Stream)).
 probe(+Action, +URL) is det
Probe the target. This is a good idea before posting a large document and be faced with an authentication challenge. Possibly we should make this an option for simpler scenarios.
 1802probe(create, URL) :-
 1803    !,
 1804    http_open(URL, Stream, [method(options)]),
 1805    close(Stream).
 1806probe(_, _).
 1807
 1808read_prolog_reply(In, Reply) :-
 1809    set_stream(In, encoding(utf8)),
 1810    read(In, Reply0),
 1811    rebind_cycles(Reply0, Reply).
 1812
 1813rebind_cycles(@(Reply, Bindings), Reply) :-
 1814    is_list(Bindings),
 1815    !,
 1816    maplist(bind, Bindings).
 1817rebind_cycles(Reply, Reply).
 1818
 1819bind(Var = Value) :-
 1820    Var = Value.
 1821
 1822server_url(Server, Action, Params, URL) :-
 1823    uri_components(Server, Components0),
 1824    uri_query_components(Query, Params),
 1825    uri_data(path, Components0, Path0),
 1826    atom_concat('pengine/', Action, PAction),
 1827    directory_file_path(Path0, PAction, Path),
 1828    uri_data(path, Components0, Path, Components),
 1829    uri_data(search, Components, Query),
 1830    uri_components(URL, Components).
 pengine_event(?EventTerm) is det
 pengine_event(?EventTerm, +Options) is det
Examines the pengine's event queue and if necessary blocks execution until a term that unifies to Term arrives in the queue. After a term from the queue has been unified to Term, the term is deleted from the queue.

Valid options are:

timeout(+Time)
Time is a float or integer and specifies the maximum time to wait in seconds. If no event has arrived before the time is up EventTerm is bound to the atom timeout.
listen(+Id)
Only listen to events from the pengine identified by Id. */
 1851pengine_event(Event) :-
 1852    pengine_event(Event, []).
 1853
 1854pengine_event(Event, Options) :-
 1855    thread_self(Self),
 1856    option(listen(Id), Options, _),
 1857    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1858    ->  true
 1859    ;   Event = timeout
 1860    ),
 1861    update_remote_destroy(Event).
 1862
 1863update_remote_destroy(Event) :-
 1864    destroy_event(Event),
 1865    arg(1, Event, Id),
 1866    pengine_remote(Id, _Server),
 1867    !,
 1868    pengine_unregister_remote(Id).
 1869update_remote_destroy(_).
 1870
 1871destroy_event(destroy(_)).
 1872destroy_event(destroy(_,_)).
 1873destroy_event(create(_,Features)) :-
 1874    memberchk(answer(Answer), Features),
 1875    !,
 1876    nonvar(Answer),
 1877    destroy_event(Answer).
 pengine_event_loop(:Closure, +Options) is det
Starts an event loop accepting event terms sent to the current pengine or thread. For each such event E, calls ignore(call(Closure, E)). A closure thus acts as a handler for the event. Some events are also treated specially:
create(ID, Term)
The ID is placed in a list of active pengines.
destroy(ID)
The ID is removed from the list of active pengines. When the last pengine ID is removed, the loop terminates.
output(ID, Term)
The predicate pengine_pull_response/2 is called.

Valid options are:

autoforward(+To)
Forwards received event terms to slaves. To is either all, all_but_sender or a Prolog list of NameOrIDs. [not yet implemented]

*/

 1906pengine_event_loop(Closure, Options) :-
 1907    child(_,_),
 1908    !,
 1909    pengine_event(Event),
 1910    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1911    ->  forall(child(_,ID), pengine_send(ID, Event))
 1912    ;   true
 1913    ),
 1914    pengine_event_loop(Event, Closure, Options).
 1915pengine_event_loop(_, _).
 1916
 1917:- meta_predicate
 1918    pengine_process_event(+, 1, -, +). 1919
 1920pengine_event_loop(Event, Closure, Options) :-
 1921    pengine_process_event(Event, Closure, Continue, Options),
 1922    (   Continue == true
 1923    ->  pengine_event_loop(Closure, Options)
 1924    ;   true
 1925    ).
 1926
 1927pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1928    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1929    (   select(answer(First), T, T1)
 1930    ->  ignore(call(Closure, create(ID, T1))),
 1931        pengine_process_event(First, Closure, Continue, Options)
 1932    ;   ignore(call(Closure, create(ID, T))),
 1933        Continue = true
 1934    ).
 1935pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1936    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1937    ignore(call(Closure, output(ID, Msg))),
 1938    pengine_pull_response(ID, []).
 1939pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1940    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1941    ignore(call(Closure, debug(ID, Msg))),
 1942    pengine_pull_response(ID, []).
 1943pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1944    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1945    ignore(call(Closure, prompt(ID, Term))).
 1946pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1947    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1948    ignore(call(Closure, success(ID, Sol, More))).
 1949pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1950    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1951    ignore(call(Closure, failure(ID))).
 1952pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1953    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1954    (   call(Closure, error(ID, Error))
 1955    ->  Continue = true
 1956    ;   forall(child(_,Child), pengine_destroy(Child)),
 1957        throw(Error)
 1958    ).
 1959pengine_process_event(stop(ID), Closure, true, _Options) :-
 1960    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1961    ignore(call(Closure, stop(ID))).
 1962pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1963    pengine_process_event(Event, Closure, _, Options),
 1964    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1965pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1966    retractall(child(_,ID)),
 1967    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1968    ignore(call(Closure, destroy(ID))).
 pengine_rpc(+URL, +Query) is nondet
 pengine_rpc(+URL, +Query, +Options) is nondet
Semantically equivalent to the sequence below, except that the query is executed in (and in the Prolog context of) the pengine server referred to by URL, rather than locally.
  copy_term_nat(Query, Copy),  % attributes are not copied to the server
  call(Copy),			 % executed on server at URL
  Query = Copy.

Valid options are:

chunk(+IntegerOrFalse)
Can be used to reduce the number of network roundtrips being made. See pengine_ask/3.
timeout(+Time)
Wait at most Time seconds for the next event from the server. The default is defined by the setting pengines:time_limit.

Remaining options (except the server option) are passed to pengine_create/1. */

 1997pengine_rpc(URL, Query) :-
 1998    pengine_rpc(URL, Query, []).
 1999
 2000pengine_rpc(URL, Query, M:Options0) :-
 2001    translate_local_sources(Options0, Options1, M),
 2002    (  option(timeout(_), Options1)
 2003    -> Options = Options1
 2004    ;  setting(time_limit, Limit),
 2005       Options = [timeout(Limit)|Options1]
 2006    ),
 2007    term_variables(Query, Vars),
 2008    Template =.. [v|Vars],
 2009    State = destroy(true),              % modified by process_event/4
 2010    setup_call_catcher_cleanup(
 2011        pengine_create([ ask(Query),
 2012                         template(Template),
 2013                         server(URL),
 2014                         id(Id)
 2015                       | Options
 2016                       ]),
 2017        wait_event(Template, State, [listen(Id)|Options]),
 2018        Why,
 2019        pengine_destroy_and_wait(State, Id, Why)).
 2020
 2021pengine_destroy_and_wait(destroy(true), Id, Why) :-
 2022    !,
 2023    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 2024    pengine_destroy(Id),
 2025    wait_destroy(Id, 10).
 2026pengine_destroy_and_wait(_, _, Why) :-
 2027    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 2028
 2029wait_destroy(Id, _) :-
 2030    \+ child(_, Id),
 2031    !.
 2032wait_destroy(Id, N) :-
 2033    pengine_event(Event, [listen(Id),timeout(10)]),
 2034    !,
 2035    (   destroy_event(Event)
 2036    ->  retractall(child(_,Id))
 2037    ;   succ(N1, N)
 2038    ->  wait_destroy(Id, N1)
 2039    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 2040        pengine_unregister_remote(Id),
 2041        retractall(child(_,Id))
 2042    ).
 2043
 2044wait_event(Template, State, Options) :-
 2045    pengine_event(Event, Options),
 2046    debug(pengine(event), 'Received ~p', [Event]),
 2047    process_event(Event, Template, State, Options).
 2048
 2049process_event(create(_ID, Features), Template, State, Options) :-
 2050    memberchk(answer(First), Features),
 2051    process_event(First, Template, State, Options).
 2052process_event(error(_ID, Error), _Template, _, _Options) :-
 2053    throw(Error).
 2054process_event(failure(_ID, _Time), _Template, _, _Options) :-
 2055    fail.
 2056process_event(prompt(ID, Prompt), Template, State, Options) :-
 2057    pengine_rpc_prompt(ID, Prompt, Reply),
 2058    pengine_send(ID, input(Reply)),
 2059    wait_event(Template, State, Options).
 2060process_event(output(ID, Term), Template, State, Options) :-
 2061    pengine_rpc_output(ID, Term),
 2062    pengine_pull_response(ID, Options),
 2063    wait_event(Template, State, Options).
 2064process_event(debug(ID, Message), Template, State, Options) :-
 2065    debug(pengine(debug), '~w', [Message]),
 2066    pengine_pull_response(ID, Options),
 2067    wait_event(Template, State, Options).
 2068process_event(success(_ID, Solutions, _Proj, _Time, false),
 2069              Template, _, _Options) :-
 2070    !,
 2071    member(Template, Solutions).
 2072process_event(success(ID, Solutions, _Proj, _Time, true),
 2073              Template, State, Options) :-
 2074    (   member(Template, Solutions)
 2075    ;   pengine_next(ID, Options),
 2076        wait_event(Template, State, Options)
 2077    ).
 2078process_event(destroy(ID, Event), Template, State, Options) :-
 2079    !,
 2080    retractall(child(_,ID)),
 2081    nb_setarg(1, State, false),
 2082    debug(pengine(destroy), 'State: ~p~n', [State]),
 2083    process_event(Event, Template, State, Options).
 2084% compatibility with older versions of the protocol.
 2085process_event(success(ID, Solutions, Time, More),
 2086              Template, State, Options) :-
 2087    process_event(success(ID, Solutions, _Proj, Time, More),
 2088                  Template, State, Options).
 2089
 2090
 2091pengine_rpc_prompt(ID, Prompt, Term) :-
 2092    prompt(ID, Prompt, Term0),
 2093    !,
 2094    Term = Term0.
 2095pengine_rpc_prompt(_ID, Prompt, Term) :-
 2096    setup_call_cleanup(
 2097        prompt(Old, Prompt),
 2098        read(Term),
 2099        prompt(_, Old)).
 2100
 2101pengine_rpc_output(ID, Term) :-
 2102    output(ID, Term),
 2103    !.
 2104pengine_rpc_output(_ID, Term) :-
 2105    print(Term).
 prompt(+ID, +Prompt, -Term) is semidet
Hook to handle pengine_input/2 from the remote pengine. If the hooks fails, pengine_rpc/3 calls read/1 using the current prompt.
 2112:- multifile prompt/3.
 output(+ID, +Term) is semidet
Hook to handle pengine_output/1 from the remote pengine. If the hook fails, it calls print/1 on Term.
 2119:- multifile output/2. 2120
 2121
 2122/*================= HTTP handlers =======================
 2123*/
 2124
 2125%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2126%   time_limit(inifinite) because pengines have their  own timeout. Also
 2127%   note that we use spawn. This  is   needed  because we can easily get
 2128%   many clients waiting for  some  action   on  a  pengine to complete.
 2129%   Without spawning, we would quickly exhaust   the  worker pool of the
 2130%   HTTP server.
 2131%
 2132%   FIXME: probably we should wait for a   short time for the pengine on
 2133%   the default worker thread. Only if  that   time  has expired, we can
 2134%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2135%   improve the performance and reduce the usage of threads.
 2136
 2137:- http_handler(root(pengine),               http_404([]),
 2138                [ id(pengines) ]). 2139:- http_handler(root(pengine/create),        http_pengine_create,
 2140                [ time_limit(infinite), spawn([]) ]). 2141:- http_handler(root(pengine/send),          http_pengine_send,
 2142                [ time_limit(infinite), spawn([]) ]). 2143:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 2144                [ time_limit(infinite), spawn([]) ]). 2145:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 2146:- http_handler(root(pengine/detach),        http_pengine_detach,        []). 2147:- http_handler(root(pengine/list),          http_pengine_list,          []). 2148:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 2149:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 2150
 2151:- http_handler(root(pengine/'pengines.js'),
 2152                http_reply_file(library('http/web/js/pengines.js'), []), []). 2153:- http_handler(root(pengine/'plterm.css'),
 2154                http_reply_file(library('http/web/css/plterm.css'), []), []).
 http_pengine_create(+Request)
HTTP POST handler for =/pengine/create=. This API accepts the pengine creation parameters both as application/json and as www-form-encoded. Accepted parameters:
ParameterDefaultComment
formatprologOutput format
applicationpengine_sandboxPengine application
chunk1Chunk-size for results
solutionschunkedIf all, emit all results
ask-The query
template-Output template
src_text""Program
src_url-Program to download
disposition-Download location

Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.

Using chunk=false simulates the recursive toplevel. See pengine_ask/3.

 2184http_pengine_create(Request) :-
 2185    reply_options(Request, [post]),
 2186    !.
 2187http_pengine_create(Request) :-
 2188    memberchk(content_type(CT), Request),
 2189    sub_atom(CT, 0, _, _, 'application/json'),
 2190    !,
 2191    http_read_json_dict(Request, Dict),
 2192    dict_atom_option(format, Dict, Format, prolog),
 2193    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2194    http_pengine_create(Request, Application, Format, Dict).
 2195http_pengine_create(Request) :-
 2196    Optional = [optional(true)],
 2197    OptString = [string|Optional],
 2198    Form = [ format(Format, [default(prolog)]),
 2199             application(Application, [default(pengine_sandbox)]),
 2200             chunk(_, [nonneg;oneof([false]), default(1)]),
 2201             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2202             ask(_, OptString),
 2203             template(_, OptString),
 2204             src_text(_, OptString),
 2205             disposition(_, OptString),
 2206             src_url(_, Optional)
 2207           ],
 2208    http_parameters(Request, Form),
 2209    form_dict(Form, Dict),
 2210    http_pengine_create(Request, Application, Format, Dict).
 2211
 2212dict_atom_option(Key, Dict, Atom, Default) :-
 2213    (   get_dict(Key, Dict, String)
 2214    ->  atom_string(Atom, String)
 2215    ;   Atom = Default
 2216    ).
 2217
 2218form_dict(Form, Dict) :-
 2219    form_values(Form, Pairs),
 2220    dict_pairs(Dict, _, Pairs).
 2221
 2222form_values([], []).
 2223form_values([H|T], Pairs) :-
 2224    arg(1, H, Value),
 2225    nonvar(Value),
 2226    !,
 2227    functor(H, Name, _),
 2228    Pairs = [Name-Value|PairsT],
 2229    form_values(T, PairsT).
 2230form_values([_|T], Pairs) :-
 2231    form_values(T, Pairs).
 http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2236http_pengine_create(Request, Application, Format, Dict) :-
 2237    current_application(Application),
 2238    !,
 2239    allowed(Request, Application),
 2240    authenticate(Request, Application, UserOptions),
 2241    dict_to_options(Dict, Application, CreateOptions0),
 2242    append(UserOptions, CreateOptions0, CreateOptions),
 2243    pengine_uuid(Pengine),
 2244    message_queue_create(Queue, [max_size(25)]),
 2245    setting(Application:time_limit, TimeLimit),
 2246    get_time(Now),
 2247    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2248    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2249    create(Queue, Pengine, CreateOptions, http, Application),
 2250    create_wait_and_output_result(Pengine, Queue, Format,
 2251                                  TimeLimit, Dict),
 2252    gc_abandoned_queues.
 2253http_pengine_create(_Request, Application, Format, _Dict) :-
 2254    Error = existence_error(pengine_application, Application),
 2255    pengine_uuid(ID),
 2256    output_result(Format, error(ID, error(Error, _))).
 2257
 2258
 2259dict_to_options(Dict, Application, CreateOptions) :-
 2260    dict_pairs(Dict, _, Pairs),
 2261    pairs_create_options(Pairs, Application, CreateOptions).
 2262
 2263pairs_create_options([], _, []) :- !.
 2264pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2265    Opt =.. [N,V],
 2266    pengine_create_option(Opt), N \== user,
 2267    !,
 2268    (   create_option_type(Opt, atom)
 2269    ->  atom_string(V, V0)               % term creation must be done if
 2270    ;   V = V0                           % we created the source and know
 2271    ),                                   % the operators.
 2272    pairs_create_options(T0, App, T).
 2273pairs_create_options([_|T0], App, T) :-
 2274    pairs_create_options(T0, App, T).
 wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit) is det
Wait for the Pengine's Queue and if there is a message, send it to the requester using output_result/1. If Pengine does not answer within the time specified by the setting time_limit, Pengine is aborted and the result is error(time_limit_exceeded, _).
 2285wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
 2286    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2287                                 [ timeout(TimeLimit)
 2288                                 ]),
 2289              Error, true)
 2290    ->  (   var(Error)
 2291        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2292            ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2293            protect_pengine(Pengine, output_result(Format, Event))
 2294        ;   output_result(Format, died(Pengine))
 2295        )
 2296    ;   time_limit_exceeded(Pengine, Format)
 2297    ).
 create_wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit, +Dict) is det
Intercepts the `solutions=all' case used for downloading results. Dict may contain a disposition key to denote the download location.
 2306create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2307    get_dict(solutions, Dict, all),
 2308    !,
 2309    between(1, infinite, Page),
 2310    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2311                                 [ timeout(TimeLimit)
 2312                                 ]),
 2313              Error, true)
 2314    ->  (   var(Error)
 2315        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2316            (   destroy_queue_from_http(Pengine, Event, Queue)
 2317            ->  !,
 2318                protect_pengine(Pengine,
 2319                                output_result(Format, page(Page, Event), Dict))
 2320            ;   is_more_event(Event)
 2321            ->  pengine_thread(Pengine, Thread),
 2322                thread_send_message(Thread, pengine_request(next)),
 2323                protect_pengine(Pengine,
 2324                                output_result(Format, page(Page, Event), Dict)),
 2325                fail
 2326            ;   !,
 2327                protect_pengine(Pengine,
 2328                                output_result(Format, page(Page, Event), Dict))
 2329            )
 2330        ;   !, output_result(Format, died(Pengine))
 2331        )
 2332    ;   !, time_limit_exceeded(Pengine, Format)
 2333    ),
 2334    !.
 2335create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :-
 2336    wait_and_output_result(Pengine, Queue, Format, TimeLimit).
 2337
 2338is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2339is_more_event(create(_, Options)) :-
 2340    memberchk(answer(Event), Options),
 2341    is_more_event(Event).
 time_limit_exceeded(+Pengine, +Format)
The Pengine did not reply within its time limit. Send a reply to the client in the requested format and interrupt the Pengine.
bug
- Ideally, if the Pengine has destroy set to false, we should get the Pengine back to its main loop. Unfortunately we only have normal exceptions that may be caught by the Pengine and abort which cannot be caught and thus destroys the Pengine.
 2355time_limit_exceeded(Pengine, Format) :-
 2356    call_cleanup(
 2357        pengine_destroy(Pengine, [force(true)]),
 2358        output_result(Format,
 2359                      destroy(Pengine,
 2360                              error(Pengine, time_limit_exceeded)))).
 destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet
Consider destroying the output queue for Pengine after sending Event back to the HTTP client. We can destroy the queue if
To be done
- If the client did not request all output, the queue will not be destroyed. We need some timeout and GC for that.
 2375destroy_queue_from_http(ID, _, Queue) :-
 2376    output_queue(ID, Queue, _),
 2377    !,
 2378    destroy_queue_if_empty(Queue).
 2379destroy_queue_from_http(ID, Event, Queue) :-
 2380    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2381    is_destroy_event(Event),
 2382    !,
 2383    message_queue_property(Queue, size(Waiting)),
 2384    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2385    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2386
 2387is_destroy_event(destroy(_)).
 2388is_destroy_event(destroy(_,_)).
 2389is_destroy_event(create(_, Options)) :-
 2390    memberchk(answer(Event), Options),
 2391    is_destroy_event(Event).
 2392
 2393destroy_queue_if_empty(Queue) :-
 2394    thread_peek_message(Queue, _),
 2395    !.
 2396destroy_queue_if_empty(Queue) :-
 2397    retractall(output_queue(_, Queue, _)),
 2398    message_queue_destroy(Queue).
 gc_abandoned_queues
Check whether there are queues that have been abadoned. This happens if the stream contains output events and not all of them are read by the client.
 2406:- dynamic
 2407    last_gc/1. 2408
 2409gc_abandoned_queues :-
 2410    consider_queue_gc,
 2411    !,
 2412    get_time(Now),
 2413    (   output_queue(_, Queue, Time),
 2414        Now-Time > 15*60,
 2415        retract(output_queue(_, Queue, Time)),
 2416        message_queue_destroy(Queue),
 2417        fail
 2418    ;   retractall(last_gc(_)),
 2419        asserta(last_gc(Now))
 2420    ).
 2421gc_abandoned_queues.
 2422
 2423consider_queue_gc :-
 2424    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2425    N > 100,
 2426    (   last_gc(Time),
 2427        get_time(Now),
 2428        Now-Time > 5*60
 2429    ->  true
 2430    ;   \+ last_gc(_)
 2431    ).
 sync_destroy_queue_from_http(+Pengine, +Queue) is det
 sync_delay_destroy_queue(+Pengine, +Queue) is det
Handle destruction of the message queue connecting the HTTP side to the pengine. We cannot delete the queue when the pengine dies because the queue may contain output events. Termination of the pengine and finishing the HTTP exchange may happen in both orders. This means we need handle this using synchronization.
sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called (indirectly) from pengine_done/1 if the pengine's thread dies.
sync_destroy_queue_from_http(+Pengine, +Queue)
Called from destroy_queue/3, from wait_and_output_result/4, i.e., from the HTTP side.
 2449:- dynamic output_queue_destroyed/1. 2450
 2451sync_destroy_queue_from_http(ID, Queue) :-
 2452    (   output_queue(ID, Queue, _)
 2453    ->  destroy_queue_if_empty(Queue)
 2454    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2455    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2456              [Queue]),
 2457        get_time(Now),
 2458        asserta(output_queue(ID, Queue, Now))
 2459    ;   message_queue_destroy(Queue),
 2460        asserta(output_queue_destroyed(Queue))
 2461    ).
 sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called from pengine_unregister/1 when the pengine thread terminates. It is called while the mutex pengine held.
 2468sync_destroy_queue_from_pengine(ID, Queue) :-
 2469    (   retract(output_queue_destroyed(Queue))
 2470    ->  true
 2471    ;   get_time(Now),
 2472        asserta(output_queue(ID, Queue, Now))
 2473    ),
 2474    retractall(pengine_queue(ID, Queue, _, _)).
 2475
 2476
 2477http_pengine_send(Request) :-
 2478    reply_options(Request, [get,post]),
 2479    !.
 2480http_pengine_send(Request) :-
 2481    http_parameters(Request,
 2482                    [ id(ID, [ type(atom) ]),
 2483                      event(EventString, [optional(true)]),
 2484                      format(Format, [default(prolog)])
 2485                    ]),
 2486    catch(read_event(ID, Request, Format, EventString, Event),
 2487          Error,
 2488          true),
 2489    (   var(Error)
 2490    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2491        (   pengine_thread(ID, Thread)
 2492        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2493            random_delay,
 2494            broadcast(pengine(send(ID, Event))),
 2495            thread_send_message(Thread, pengine_request(Event)),
 2496            wait_and_output_result(ID, Queue, Format, TimeLimit)
 2497        ;   atom(ID)
 2498        ->  pengine_died(Format, ID)
 2499        ;   http_404([], Request)
 2500        )
 2501    ;   Error = error(existence_error(pengine, ID), _)
 2502    ->  pengine_died(Format, ID)
 2503    ;   output_result(Format, error(ID, Error))
 2504    ).
 2505
 2506pengine_died(Format, Pengine) :-
 2507    output_result(Format, error(Pengine,
 2508                                error(existence_error(pengine, Pengine),_))).
 read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
Read an event on behalve of Pengine. Note that the pengine's module should not be deleted while we are reading using its syntax (module). This is ensured using the pengine_done mutex.
See also
- pengine_done/0.
 2519read_event(Pengine, Request, Format, EventString, Event) :-
 2520    protect_pengine(
 2521        Pengine,
 2522        ( get_pengine_module(Pengine, Module),
 2523          read_event_2(Request, EventString, Module, Event0, Bindings)
 2524        )),
 2525    !,
 2526    fix_bindings(Format, Event0, Bindings, Event).
 2527read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2528    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2529    discard_post_data(Request),
 2530    existence_error(pengine, Pengine).
 read_event_(+Request, +EventString, +Module, -Event, -Bindings)
Read the sent event. The event is a Prolog term that is either in the event parameter or as a posted document.
 2538read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2539    nonvar(EventString),
 2540    !,
 2541    term_string(Event, EventString,
 2542                [ variable_names(Bindings),
 2543                  module(Module)
 2544                ]).
 2545read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2546    option(method(post), Request),
 2547    http_read_data(Request,     Event,
 2548                   [ content_type('application/x-prolog'),
 2549                     module(Module),
 2550                     variable_names(Bindings)
 2551                   ]).
 discard_post_data(+Request) is det
If this is a POST request, discard the posted data.
 2557discard_post_data(Request) :-
 2558    option(method(post), Request),
 2559    !,
 2560    setup_call_cleanup(
 2561        open_null_stream(NULL),
 2562        http_read_data(Request, _, [to(stream(NULL))]),
 2563        close(NULL)).
 2564discard_post_data(_).
 fix_bindings(+Format, +EventIn, +Bindings, -Event) is det
Generate the template for json(-s) Format from the variables in the asked Goal. Variables starting with an underscore, followed by an capital letter are ignored from the template.
 2572fix_bindings(Format,
 2573             ask(Goal, Options0), Bindings,
 2574             ask(Goal, NewOptions)) :-
 2575    json_lang(Format),
 2576    !,
 2577    exclude(anon, Bindings, NamedBindings),
 2578    template(NamedBindings, Template, Options0, Options1),
 2579    select_option(chunk(Paging), Options1, Options2, 1),
 2580    NewOptions = [ template(Template),
 2581                   chunk(Paging),
 2582                   bindings(NamedBindings)
 2583                 | Options2
 2584                 ].
 2585fix_bindings(_, Command, _, Command).
 2586
 2587template(_, Template, Options0, Options) :-
 2588    select_option(template(Template), Options0, Options),
 2589    !.
 2590template(Bindings, Template, Options, Options) :-
 2591    dict_create(Template, swish_default_template, Bindings).
 2592
 2593anon(Name=_) :-
 2594    sub_atom(Name, 0, _, _, '_'),
 2595    sub_atom(Name, 1, 1, _, Next),
 2596    char_type(Next, prolog_var_start).
 2597
 2598var_name(Name=_, Name).
 json_lang(+Format) is semidet
True if Format is a JSON variation.
 2605json_lang(json) :- !.
 2606json_lang(Format) :-
 2607    sub_atom(Format, 0, _, _, 'json-').
 http_pengine_pull_response(+Request)
HTTP handler for /pengine/pull_response. Pulls possible pending messages from the pengine.
 2614http_pengine_pull_response(Request) :-
 2615    reply_options(Request, [get]),
 2616    !.
 2617http_pengine_pull_response(Request) :-
 2618    http_parameters(Request,
 2619            [   id(ID, []),
 2620                format(Format, [default(prolog)])
 2621            ]),
 2622    reattach(ID),
 2623    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2624        ->  true
 2625        ;   output_queue(ID, Queue, _),
 2626            TimeLimit = 0
 2627        )
 2628    ->  wait_and_output_result(ID, Queue, Format, TimeLimit)
 2629    ;   http_404([], Request)
 2630    ).
 http_pengine_abort(+Request)
HTTP handler for /pengine/abort. Note that abort may be sent at any time and the reply may be handled by a pull_response. In that case, our pengine has already died before we get to wait_and_output_result/4.
 2639http_pengine_abort(Request) :-
 2640    reply_options(Request, [get,post]),
 2641    !.
 2642http_pengine_abort(Request) :-
 2643    http_parameters(Request,
 2644            [   id(ID, [])
 2645            ]),
 2646    (   pengine_thread(ID, _Thread)
 2647    ->  broadcast(pengine(abort(ID))),
 2648        abort_pending_output(ID),
 2649        pengine_abort(ID),
 2650        reply_json(true)
 2651    ;   http_404([], Request)
 2652    ).
 http_pengine_detach(+Request)
Detach a Pengine while keeping it running. This has the following consequences:
 2664http_pengine_detach(Request) :-
 2665    reply_options(Request, [post]),
 2666    !.
 2667http_pengine_detach(Request) :-
 2668    http_parameters(Request,
 2669                    [ id(ID, [])
 2670                    ]),
 2671    http_read_json_dict(Request, ClientData),
 2672    (   pengine_property(ID, application(Application)),
 2673        allowed(Request, Application),
 2674        authenticate(Request, Application, _UserOptions)
 2675    ->  broadcast(pengine(detach(ID))),
 2676        get_time(Now),
 2677        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2678        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2679        message_queue_set(Queue, max_size(1000)),
 2680        pengine_reply(Queue, detached(ID)),
 2681        reply_json(true)
 2682    ;   http_404([], Request)
 2683    ).
 2684
 2685:- if(\+current_predicate(message_queue_set/2)). 2686message_queue_set(_,_).
 2687:- endif. 2688
 2689reattach(ID) :-
 2690    (   retract(pengine_detached(ID, _Data)),
 2691        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2692    ->  message_queue_set(Queue, max_size(25))
 2693    ;   true
 2694    ).
 http_pengine_destroy_all(+Request)
Destroy a list of pengines. Normally called by pengines.js if the browser window is closed.
 2702http_pengine_destroy_all(Request) :-
 2703    reply_options(Request, [get,post]),
 2704    !.
 2705http_pengine_destroy_all(Request) :-
 2706    http_parameters(Request,
 2707                    [ ids(IDsAtom, [])
 2708                    ]),
 2709    atomic_list_concat(IDs, ',', IDsAtom),
 2710    forall(( member(ID, IDs),
 2711             \+ pengine_detached(ID, _)
 2712           ),
 2713           pengine_destroy(ID, [force(true)])),
 2714    reply_json("ok").
 http_pengine_ping(+Request)
HTTP handler for /pengine/ping. If the requested Pengine is alive and event status(Pengine, Stats) is created, where Stats is the return of thread_statistics/2.
 2722http_pengine_ping(Request) :-
 2723    reply_options(Request, [get]),
 2724    !.
 2725http_pengine_ping(Request) :-
 2726    http_parameters(Request,
 2727                    [ id(Pengine, []),
 2728                      format(Format, [default(prolog)])
 2729                    ]),
 2730    (   pengine_thread(Pengine, Thread),
 2731        Error = error(_,_),
 2732        catch(thread_statistics(Thread, Stats), Error, fail)
 2733    ->  output_result(Format, ping(Pengine, Stats))
 2734    ;   output_result(Format, died(Pengine))
 2735    ).
 http_pengine_list(+Request)
HTTP handler for `/pengine/list`, providing information about running Pengines.
To be done
- Only list detached Pengines associated to the logged in user.
 2744http_pengine_list(Request) :-
 2745    reply_options(Request, [get]),
 2746    !.
 2747http_pengine_list(Request) :-
 2748    http_parameters(Request,
 2749                    [ status(Status, [default(detached), oneof([detached])]),
 2750                      application(Application, [default(pengine_sandbox)])
 2751                    ]),
 2752    allowed(Request, Application),
 2753    authenticate(Request, Application, _UserOptions),
 2754    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2755    reply_json(json{pengines: Terms}).
 2756
 2757listed_pengine(Application, detached, State) :-
 2758    State = pengine{id:Id,
 2759                    detached:Time,
 2760                    queued:Queued,
 2761                    stats:Stats},
 2762
 2763    pengine_property(Id, application(Application)),
 2764    pengine_property(Id, detached(Time)),
 2765    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2766    message_queue_property(Queue, size(Queued)),
 2767    (   pengine_thread(Id, Thread),
 2768        catch(thread_statistics(Thread, Stats), _, fail)
 2769    ->  true
 2770    ;   Stats = thread{status:died}
 2771    ).
 output_result(+Format, +EventTerm) is det
 output_result(+Format, +EventTerm, +OptionsDict) is det
Formulate an HTTP response from a pengine event term. Format is one of prolog, json or json-s.
 2780:- dynamic
 2781    pengine_replying/2.             % +Pengine, +Thread
 2782
 2783output_result(Format, Event) :-
 2784    arg(1, Event, Pengine),
 2785    thread_self(Thread),
 2786    cors_enable,            % contingent on http:cors setting
 2787    disable_client_cache,
 2788    setup_call_cleanup(
 2789        asserta(pengine_replying(Pengine, Thread), Ref),
 2790        catch(output_result(Format, Event, _{}),
 2791              pengine_abort_output,
 2792              true),
 2793        erase(Ref)).
 2794
 2795output_result(Lang, Event, Dict) :-
 2796    write_result(Lang, Event, Dict),
 2797    !.
 2798output_result(prolog, Event, _) :-
 2799    !,
 2800    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2801    write_term(Event,
 2802               [ quoted(true),
 2803                 ignore_ops(true),
 2804                 fullstop(true),
 2805                 blobs(portray),
 2806                 portray_goal(portray_blob),
 2807                 nl(true)
 2808               ]).
 2809output_result(Lang, Event, _) :-
 2810    json_lang(Lang),
 2811    !,
 2812    (   event_term_to_json_data(Event, JSON, Lang)
 2813    ->  reply_json(JSON)
 2814    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2815    ).
 2816output_result(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2817    domain_error(pengine_format, Lang).
 portray_blob(+Blob, +Options) is det
Portray non-text blobs that may appear in output terms. Not really sure about that. Basically such terms need to be avoided as they are meaningless outside the process. The generated error is hard to debug though, so now we send them as '$BLOB'(Type). Future versions may include more info, depending on Type.
 2827:- public portray_blob/2.               % called from write-term
 2828portray_blob(Blob, _Options) :-
 2829    blob(Blob, Type),
 2830    writeq('$BLOB'(Type)).
 abort_pending_output(+Pengine) is det
If we get an abort, it is possible that output is being produced for the client. This predicate aborts these threads.
 2837abort_pending_output(Pengine) :-
 2838    forall(pengine_replying(Pengine, Thread),
 2839           abort_output_thread(Thread)).
 2840
 2841abort_output_thread(Thread) :-
 2842    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2843          error(existence_error(thread, _), _),
 2844          true).
 write_result(+Lang, +Event, +Dict) is semidet
Hook that allows for different output formats. The core Pengines library supports prolog and various JSON dialects. The hook event_to_json/3 can be used to refine the JSON dialects. This hook must be used if a completely different output format is desired.
 disable_client_cache
Make sure the client will not cache our page.
See also
- http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2860disable_client_cache :-
 2861    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2862            Pragma: no-cache\r\n\c
 2863            Expires: 0\r\n').
 2864
 2865event_term_to_json_data(Event, JSON, Lang) :-
 2866    event_to_json(Event, JSON, Lang),
 2867    !.
 2868event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2869                        json{event:success, id:ID, time:Time,
 2870                             data:Bindings, more:More, projection:Projection},
 2871                        json) :-
 2872    !,
 2873    term_to_json(Bindings0, Bindings).
 2874event_term_to_json_data(destroy(ID, Event),
 2875                        json{event:destroy, id:ID, data:JSON},
 2876                        Style) :-
 2877    !,
 2878    event_term_to_json_data(Event, JSON, Style).
 2879event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2880    !,
 2881    (   select(answer(First0), Features0, Features1)
 2882    ->  event_term_to_json_data(First0, First, Style),
 2883        Features = [answer(First)|Features1]
 2884    ;   Features = Features0
 2885    ),
 2886    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2887event_term_to_json_data(destroy(ID, Event),
 2888                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2889    !,
 2890    event_term_to_json_data(Event, JSON, Style).
 2891event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2892    !,
 2893    Error0 = json{event:error, id:ID, data:Message},
 2894    add_error_details(ErrorTerm, Error0, Error),
 2895    message_to_string(ErrorTerm, Message).
 2896event_term_to_json_data(failure(ID, Time),
 2897                        json{event:failure, id:ID, time:Time}, _) :-
 2898    !.
 2899event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2900    functor(EventTerm, F, 1),
 2901    !,
 2902    arg(1, EventTerm, ID).
 2903event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2904    functor(EventTerm, F, 2),
 2905    arg(1, EventTerm, ID),
 2906    arg(2, EventTerm, Data),
 2907    term_to_json(Data, JSON).
 2908
 2909:- public add_error_details/3.
 add_error_details(+Error, +JSON0, -JSON)
Add format error code and location information to an error. Also used by pengines_io.pl.
 2916add_error_details(Error, JSON0, JSON) :-
 2917    add_error_code(Error, JSON0, JSON1),
 2918    add_error_location(Error, JSON1, JSON).
 add_error_code(+Error, +JSON0, -JSON) is det
Add a code field to JSON0 of Error is an ISO error term. The error code is the functor name of the formal part of the error, e.g., syntax_error, type_error, etc. Some errors carry more information:
existence_error(Type, Obj)
{arg1:Type, arg2:Obj}, where Obj is stringified of it is not atomic.
 2931add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2932    atom(Type),
 2933    !,
 2934    to_atomic(Obj, Value),
 2935    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 2936add_error_code(error(Formal, _), Error0, Error) :-
 2937    callable(Formal),
 2938    !,
 2939    functor(Formal, Code, _),
 2940    Error = Error0.put(code, Code).
 2941add_error_code(_, Error, Error).
 2942
 2943% What to do with large integers?
 2944to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 2945to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 2946to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 2947to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 add_error_location(+Error, +JSON0, -JSON) is det
Add a location property if the error can be associated with a source location. The location is an object with properties file and line and, if available, the character location in the line.
 2956add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 2957    atom(Path), integer(Line),
 2958    !,
 2959    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 2960add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 2961    atom(Path), integer(Line), integer(Ch),
 2962    !,
 2963    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 2964add_error_location(_, Term, Term).
 event_to_json(+Event, -JSONTerm, +Lang) is semidet
Hook that translates a Pengine event structure into a term suitable for reply_json/1, according to the language specification Lang. This can be used to massage general Prolog terms, notably associated with success(ID, Bindings, Projection, Time, More) and output(ID, Term) into a format suitable for processing at the client side.
 2975%:- multifile pengines:event_to_json/3.
 2976
 2977
 2978                 /*******************************
 2979                 *        ACCESS CONTROL        *
 2980                 *******************************/
 allowed(+Request, +Application) is det
Check whether the peer is allowed to connect. Returns a forbidden header if contact is not allowed.
 2987allowed(Request, Application) :-
 2988    setting(Application:allow_from, Allow),
 2989    match_peer(Request, Allow),
 2990    setting(Application:deny_from, Deny),
 2991    \+ match_peer(Request, Deny),
 2992    !.
 2993allowed(Request, _Application) :-
 2994    memberchk(request_uri(Here), Request),
 2995    throw(http_reply(forbidden(Here))).
 2996
 2997match_peer(_, Allowed) :-
 2998    memberchk(*, Allowed),
 2999    !.
 3000match_peer(_, []) :- !, fail.
 3001match_peer(Request, Allowed) :-
 3002    http_peer(Request, Peer),
 3003    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 3004    (   memberchk(Peer, Allowed)
 3005    ->  true
 3006    ;   member(Pattern, Allowed),
 3007        match_peer_pattern(Pattern, Peer)
 3008    ).
 3009
 3010match_peer_pattern(Pattern, Peer) :-
 3011    ip_term(Pattern, IP),
 3012    ip_term(Peer, IP),
 3013    !.
 3014
 3015ip_term(Peer, Pattern) :-
 3016    split_string(Peer, ".", "", PartStrings),
 3017    ip_pattern(PartStrings, Pattern).
 3018
 3019ip_pattern([], []).
 3020ip_pattern([*], _) :- !.
 3021ip_pattern([S|T0], [N|T]) :-
 3022    number_string(N, S),
 3023    ip_pattern(T0, T).
 authenticate(+Request, +Application, -UserOptions:list) is det
Call authentication_hook/3, returning either [user(User)], [] or an exception.
 3031authenticate(Request, Application, UserOptions) :-
 3032    authentication_hook(Request, Application, User),
 3033    !,
 3034    must_be(ground, User),
 3035    UserOptions = [user(User)].
 3036authenticate(_, _, []).
 authentication_hook(+Request, +Application, -User) is semidet
This hook is called from the =/pengine/create= HTTP handler to discover whether the server is accessed by an authorized user. It can react in three ways:
See also
- http_authenticate/3 can be used to implement this hook using default HTTP authentication data.
 3058pengine_register_user(Options) :-
 3059    option(user(User), Options),
 3060    !,
 3061    pengine_self(Me),
 3062    asserta(pengine_user(Me, User)).
 3063pengine_register_user(_).
 pengine_user(-User) is semidet
True when the pengine was create by an HTTP request that authorized User.
See also
- authentication_hook/3 can be used to extract authorization from the HTTP header.
 3074pengine_user(User) :-
 3075    pengine_self(Me),
 3076    pengine_user(Me, User).
 reply_options(+Request, +Methods) is semidet
Reply the HTTP OPTIONS request
 3082reply_options(Request, Allowed) :-
 3083    option(method(options), Request),
 3084    !,
 3085    cors_enable(Request,
 3086                [ methods(Allowed)
 3087                ]),
 3088    format('Content-type: text/plain\r\n'),
 3089    format('~n').                   % empty body
 3090
 3091
 3092                 /*******************************
 3093                 *        COMPILE SOURCE        *
 3094                 *******************************/
 pengine_src_text(+SrcText, +Module) is det
Asserts the clauses defined in SrcText in the private database of the current Pengine. This predicate processes the `src_text' option of pengine_create/1. */
 3103pengine_src_text(Src, Module) :-
 3104    pengine_self(Self),
 3105    format(atom(ID), 'pengine://~w/src', [Self]),
 3106    extra_load_options(Self, Options),
 3107    setup_call_cleanup(
 3108        open_chars_stream(Src, Stream),
 3109        load_files(Module:ID,
 3110                   [ stream(Stream),
 3111                     module(Module),
 3112                     silent(true)
 3113                   | Options
 3114                   ]),
 3115        close(Stream)),
 3116    keep_source(Self, ID, Src).
 3117
 3118system:'#file'(File, _Line) :-
 3119    prolog_load_context(stream, Stream),
 3120    set_stream(Stream, file_name(File)),
 3121    set_stream(Stream, record_position(false)),
 3122    set_stream(Stream, record_position(true)).
 pengine_src_url(+URL, +Module) is det
Asserts the clauses defined in URL in the private database of the current Pengine. This predicate processes the `src_url' option of pengine_create/1.
To be done
- : make a sensible guess at the encoding.
 3132pengine_src_url(URL, Module) :-
 3133    pengine_self(Self),
 3134    uri_encoded(path, URL, Path),
 3135    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3136    extra_load_options(Self, Options),
 3137    (   get_pengine_application(Self, Application),
 3138        setting(Application:debug_info, false)
 3139    ->  setup_call_cleanup(
 3140            http_open(URL, Stream, []),
 3141            ( set_stream(Stream, encoding(utf8)),
 3142              load_files(Module:ID,
 3143                         [ stream(Stream),
 3144                           module(Module)
 3145                         | Options
 3146                         ])
 3147            ),
 3148            close(Stream))
 3149    ;   setup_call_cleanup(
 3150            http_open(URL, TempStream, []),
 3151            ( set_stream(TempStream, encoding(utf8)),
 3152              read_string(TempStream, _, Src)
 3153            ),
 3154            close(TempStream)),
 3155        setup_call_cleanup(
 3156            open_chars_stream(Src, Stream),
 3157            load_files(Module:ID,
 3158                       [ stream(Stream),
 3159                         module(Module)
 3160                       | Options
 3161                       ]),
 3162            close(Stream)),
 3163        keep_source(Self, ID, Src)
 3164    ).
 3165
 3166
 3167extra_load_options(Pengine, Options) :-
 3168    pengine_not_sandboxed(Pengine),
 3169    !,
 3170    Options = [].
 3171extra_load_options(_, [sandboxed(true)]).
 3172
 3173
 3174keep_source(Pengine, ID, SrcText) :-
 3175    get_pengine_application(Pengine, Application),
 3176    setting(Application:debug_info, true),
 3177    !,
 3178    to_string(SrcText, SrcString),
 3179    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3180keep_source(_, _, _).
 3181
 3182to_string(String, String) :-
 3183    string(String),
 3184    !.
 3185to_string(Atom, String) :-
 3186    atom_string(Atom, String),
 3187    !.
 3188
 3189		 /*******************************
 3190		 *            SANDBOX		*
 3191		 *******************************/
 3192
 3193:- multifile
 3194    sandbox:safe_primitive/1. 3195
 3196sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3197sandbox:safe_primitive(pengines:pengine_output(_)).
 3198sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3199
 3200
 3201                 /*******************************
 3202                 *            MESSAGES          *
 3203                 *******************************/
 3204
 3205prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3206    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3207      'This is normally caused by an insufficiently instantiated'-[], nl,
 3208      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3209      'find all possible instantations of Var.'-[]
 3210    ]