. (utf8) 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2023, Torbjörn Lager, 8 VU University Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(pengines, 39 [ pengine_create/1, % +Options 40 pengine_ask/3, % +Pengine, :Query, +Options 41 pengine_next/2, % +Pengine. +Options 42 pengine_stop/2, % +Pengine. +Options 43 pengine_event/2, % -Event, +Options 44 pengine_input/2, % +Prompt, -Term 45 pengine_output/1, % +Term 46 pengine_respond/3, % +Pengine, +Input, +Options 47 pengine_debug/2, % +Format, +Args 48 pengine_self/1, % -Pengine 49 pengine_pull_response/2, % +Pengine, +Options 50 pengine_destroy/1, % +Pengine 51 pengine_destroy/2, % +Pengine, +Options 52 pengine_abort/1, % +Pengine 53 pengine_application/1, % +Application 54 current_pengine_application/1, % ?Application 55 pengine_property/2, % ?Pengine, ?Property 56 pengine_user/1, % -User 57 pengine_event_loop/2, % :Closure, +Options 58 pengine_rpc/2, % +Server, :Goal 59 pengine_rpc/3 % +Server, :Goal, +Options 60 ]).
71:- autoload(library(aggregate),[aggregate_all/3]). 72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 73:- autoload(library(broadcast),[broadcast/1]). 74:- autoload(library(charsio),[open_chars_stream/2]). 75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 76:- autoload(library(error), 77 [ must_be/2, 78 existence_error/2, 79 permission_error/3, 80 domain_error/2 81 ]). 82:- autoload(library(filesex),[directory_file_path/3]). 83:- autoload(library(listing),[listing/1]). 84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 85:- autoload(library(modules),[in_temporary_module/3]). 86:- autoload(library(occurs),[sub_term/2]). 87:- autoload(library(option), 88 [select_option/3,option/2,option/3,select_option/4]). 89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 90:- autoload(library(sandbox),[safe_goal/1]). 91:- autoload(library(statistics),[thread_statistics/2]). 92:- autoload(library(term_to_json),[term_to_json/2]). 93:- autoload(library(thread_pool), 94 [thread_pool_create/3,thread_create_in_pool/4]). 95:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 96:- autoload(library(uri), 97 [ uri_components/2, 98 uri_query_components/2, 99 uri_data/3, 100 uri_data/4, 101 uri_encoded/3 102 ]). 103:- autoload(library(http/http_client),[http_read_data/3]). 104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 105:- autoload(library(http/http_dispatch), 106 [http_handler/3,http_404/2,http_reply_file/3]). 107:- autoload(library(http/http_open),[http_open/3]). 108:- autoload(library(http/http_parameters),[http_parameters/2]). 109:- autoload(library(http/http_stream),[is_cgi_stream/1]). 110:- autoload(library(http/http_wrapper),[http_peer/2]). 111 112:- use_module(library(settings),[setting/2,setting/4]). 113:- use_module(library(http/http_json), 114 [http_read_json_dict/2,reply_json/1]). 115 116:- if(exists_source(library(uuid))). 117:- autoload(library(uuid), [uuid/2]). 118:- endif. 119 120 121:- meta_predicate 122 pengine_create( ), 123 pengine_rpc( , , ), 124 pengine_event_loop( , ). 125 126:- multifile 127 write_result/3, % +Format, +Event, +Dict 128 event_to_json/3, % +Event, -JSON, +Format 129 prepare_module/3, % +Module, +Application, +Options 130 prepare_goal/3, % +GoalIn, -GoalOut, +Options 131 authentication_hook/3, % +Request, +Application, -User 132 not_sandboxed/2. % +User, +App 133 134:- predicate_options(pengine_create/1, 1, 135 [ id(-atom), 136 alias(atom), 137 application(atom), 138 destroy(boolean), 139 server(atom), 140 ask(compound), 141 template(compound), 142 chunk(integer;oneof([false])), 143 bindings(list), 144 src_list(list), 145 src_text(any), % text 146 src_url(atom), 147 src_predicates(list) 148 ]). 149:- predicate_options(pengine_ask/3, 3, 150 [ template(any), 151 chunk(integer;oneof([false])), 152 bindings(list) 153 ]). 154:- predicate_options(pengine_next/2, 2, 155 [ chunk(integer), 156 pass_to(pengine_send/3, 3) 157 ]). 158:- predicate_options(pengine_stop/2, 2, 159 [ pass_to(pengine_send/3, 3) 160 ]). 161:- predicate_options(pengine_respond/3, 2, 162 [ pass_to(pengine_send/3, 3) 163 ]). 164:- predicate_options(pengine_rpc/3, 3, 165 [ chunk(integer;oneof([false])), 166 pass_to(pengine_create/1, 1) 167 ]). 168:- predicate_options(pengine_send/3, 3, 169 [ delay(number) 170 ]). 171:- predicate_options(pengine_event/2, 2, 172 [ listen(atom), 173 pass_to(system:thread_get_message/3, 3) 174 ]). 175:- predicate_options(pengine_pull_response/2, 2, 176 [ pass_to(http_open/3, 3) 177 ]). 178:- predicate_options(pengine_event_loop/2, 2, 179 []). % not yet implemented 180 181% :- debug(pengine(transition)). 182:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 183 184goal_expansion(random_delay, Expanded) :- 185 ( debugging(pengine(delay)) 186 -> Expanded = do_random_delay 187 ; Expanded = true 188 ). 189 190do_random_delay :- 191 Delay is random(20)/1000, 192 sleep(Delay). 193 194:- meta_predicate % internal meta predicates 195 solve( , , , ), 196 findnsols_no_empty( , , , ), 197 pengine_event_loop( , , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
252pengine_create(M:Options0) :-
253 translate_local_sources(Options0, Options, M),
254 ( select_option(server(BaseURL), Options, RestOptions)
255 -> remote_pengine_create(BaseURL, RestOptions)
256 ; local_pengine_create(Options)
257 ).
src_predicates
and src_list
options into
src_text
. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
271translate_local_sources(OptionsIn, Options, Module) :- 272 translate_local_sources(OptionsIn, Sources, Options2, Module), 273 ( Sources == [] 274 -> Options = Options2 275 ; Sources = [Source] 276 -> Options = [src_text(Source)|Options2] 277 ; atomics_to_string(Sources, Source) 278 -> Options = [src_text(Source)|Options2] 279 ). 280 281translate_local_sources([], [], [], _). 282translate_local_sources([H0|T], [S0|S], Options, M) :- 283 nonvar(H0), 284 translate_local_source(H0, S0, M), 285 !, 286 translate_local_sources(T, S, Options, M). 287translate_local_sources([H|T0], S, [H|T], M) :- 288 translate_local_sources(T0, S, T, M). 289 290translate_local_source(src_predicates(PIs), Source, M) :- 291 must_be(list, PIs), 292 with_output_to(string(Source), 293 maplist(list_in_module(M), PIs)). 294translate_local_source(src_list(Terms), Source, _) :- 295 must_be(list, Terms), 296 with_output_to(string(Source), 297 forall(member(Term, Terms), 298 format('~k .~n', [Term]))). 299translate_local_source(src_text(Source), Source, _). 300 301list_in_module(M, PI) :- 302 listing(M:PI).
pengine_send(NameOrID, Term, [])
.
*/
309pengine_send(Target, Event) :-
310 pengine_send(Target, Event, []).
Any remaining options are passed to http_open/3. */
325pengine_send(Target, Event, Options) :- 326 must_be(atom, Target), 327 pengine_send2(Target, Event, Options). 328 329pengine_send2(self, Event, Options) :- 330 !, 331 thread_self(Queue), 332 delay_message(queue(Queue), Event, Options). 333pengine_send2(Name, Event, Options) :- 334 child(Name, Target), 335 !, 336 delay_message(pengine(Target), Event, Options). 337pengine_send2(Target, Event, Options) :- 338 delay_message(pengine(Target), Event, Options). 339 340delay_message(Target, Event, Options) :- 341 option(delay(Delay), Options), 342 !, 343 alarm(Delay, 344 send_message(Target, Event, Options), 345 _AlarmID, 346 [remove(true)]). 347delay_message(Target, Event, Options) :- 348 random_delay, 349 send_message(Target, Event, Options). 350 351send_message(queue(Queue), Event, _) :- 352 thread_send_message(Queue, pengine_request(Event)). 353send_message(pengine(Pengine), Event, Options) :- 354 ( pengine_remote(Pengine, Server) 355 -> remote_pengine_send(Server, Pengine, Event, Options) 356 ; pengine_thread(Pengine, Thread) 357 -> thread_send_message(Thread, pengine_request(Event)) 358 ; existence_error(pengine, Pengine) 359 ).
idle_limit
setting while using thread_idle/2 to minimis
resources.369pengine_request(Request) :- 370 thread_self(Me), 371 thread_get_message(Me, pengine_request(Request), [timeout(1)]), 372 !. 373pengine_request(Request) :- 374 pengine_self(Self), 375 get_pengine_application(Self, Application), 376 setting(Application:idle_limit, IdleLimit0), 377 IdleLimit is IdleLimit0-1, 378 thread_self(Me), 379 ( thread_idle(thread_get_message(Me, pengine_request(Request), 380 [timeout(IdleLimit)]), 381 long) 382 -> true 383 ; Request = destroy 384 ).
If the message cannot be sent within the idle_limit
setting of
the pengine, abort the pengine.
397pengine_reply(Event) :- 398 pengine_parent(Queue), 399 pengine_reply(Queue, Event). 400 401pengine_reply(_Queue, _Event0) :- 402 nb_current(pengine_idle_limit_exceeded, true), 403 !. 404pengine_reply(Queue, Event0) :- 405 arg(1, Event0, ID), 406 wrap_first_answer(ID, Event0, Event), 407 random_delay, 408 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 409 ( pengine_self(ID), 410 \+ pengine_detached(ID, _) 411 -> get_pengine_application(ID, Application), 412 setting(Application:idle_limit, IdleLimit), 413 debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]), 414 ( thread_send_message(Queue, pengine_event(ID, Event), 415 [ timeout(IdleLimit) 416 ]) 417 -> true 418 ; thread_self(Me), 419 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 420 [ID, Me]), 421 nb_setval(pengine_idle_limit_exceeded, true), 422 thread_detach(Me), 423 abort 424 ) 425 ; thread_send_message(Queue, pengine_event(ID, Event)) 426 ). 427 428wrap_first_answer(ID, Event0, CreateEvent) :- 429 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 430 arg(1, CreateEvent, ID), 431 !, 432 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 433wrap_first_answer(_ID, Event, Event). 434 435 436empty_queue :- 437 pengine_parent(Queue), 438 empty_queue(Queue, 0, Discarded), 439 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 440 441empty_queue(Queue, C0, C) :- 442 thread_get_message(Queue, _Term, [timeout(0)]), 443 !, 444 C1 is C0+1, 445 empty_queue(Queue, C1, C). 446empty_queue(_, C, C).
Options is a list of options:
false
, the
Pengine goal is not executed using findall/3 and friends and
we do not backtrack immediately over the goal. As a result,
changes to backtrackable global state are retained. This is
similar that using set_prolog_flag(toplevel_mode, recursive)
.Name = Var
terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true
or false
, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :- partition(pengine_ask_option, Options, AskOptions, SendOptions), pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
515pengine_ask(ID, Query, Options) :- 516 partition(pengine_ask_option, Options, AskOptions, SendOptions), 517 pengine_send(ID, ask(Query, AskOptions), SendOptions). 518 519 520pengine_ask_option(template(_)). 521pengine_ask_option(chunk(_)). 522pengine_ask_option(bindings(_)). 523pengine_ask_option(breakpoints(_)).
chunk(false)
.Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :- pengine_send(ID, next, Options).
*/
568pengine_next(ID, Options) :- 569 select_option(chunk(Count), Options, Options1), 570 !, 571 pengine_send(ID, next(Count), Options1). 572pengine_next(ID, Options) :- 573 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
*/
589pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
600pengine_abort(Name) :-
601 ( child(Name, Pengine)
602 -> true
603 ; Pengine = Name
604 ),
605 ( pengine_remote(Pengine, Server)
606 -> remote_pengine_abort(Server, Pengine, [])
607 ; pengine_thread(Pengine, Thread),
608 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
609 catch(thread_signal(Thread, throw(abort_query)), _, true)
610 ).
force(true)
, the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/620pengine_destroy(ID) :- 621 pengine_destroy(ID, []). 622 623pengine_destroy(Name, Options) :- 624 ( child(Name, ID) 625 -> true 626 ; ID = Name 627 ), 628 option(force(true), Options), 629 !, 630 ( pengine_thread(ID, Thread) 631 -> catch(thread_signal(Thread, abort), 632 error(existence_error(thread, _), _), true) 633 ; true 634 ). 635pengine_destroy(ID, _) :- 636 catch(pengine_send(ID, destroy), 637 error(existence_error(pengine, ID), _), 638 retractall(child(_,ID))). 639 640 641/*================= pengines administration ======================= 642*/
thread(ThreadId)
remote(URL)
653:- dynamic 654 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 655 pengine_queue/4, % Id, Queue, TimeOut, Time 656 output_queue/3, % Id, Queue, Time 657 pengine_user/2, % Id, User 658 pengine_data/2, % Id, Data 659 pengine_detached/2. % Id, Data 660:- volatile 661 current_pengine/6, 662 pengine_queue/4, 663 output_queue/3, 664 pengine_user/2, 665 pengine_data/2, 666 pengine_detached/2. 667 668:- thread_local 669 child/2. % ?Name, ?Child
675pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 676 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 677 678pengine_register_remote(Id, URL, Application, Destroy) :- 679 thread_self(Queue), 680 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http
and the queue is the
message queue used to send events to the HTTP workers.688pengine_unregister(Id) :- 689 thread_self(Me), 690 ( current_pengine(Id, Queue, Me, http, _, _) 691 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 692 ; true 693 ), 694 retractall(current_pengine(Id, _, Me, _, _, _)), 695 retractall(pengine_user(Id, _)), 696 retractall(pengine_data(Id, _)). 697 698pengine_unregister_remote(Id) :- 699 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
705pengine_self(Id) :- 706 thread_self(Thread), 707 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 708 709pengine_parent(Parent) :- 710 nb_getval(pengine_parent, Parent). 711 712pengine_thread(Pengine, Thread) :- 713 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 714 Thread \== 0, 715 !. 716 717pengine_remote(Pengine, URL) :- 718 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 719 720get_pengine_application(Pengine, Application) :- 721 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 722 !. 723 724get_pengine_module(Pengine, Pengine). 725 726:- if(current_predicate(uuid/2)). 727pengine_uuid(Id) :- 728 uuid(Id, [version(4)]). % Version 4 is random. 729:- else. 730pengine_uuid(Id) :- 731 ( current_prolog_flag(max_integer, Max1) 732 -> Max is Max1-1 733 ; Max is 1<<128 734 ), 735 random_between(0, Max, Num), 736 atom_number(Id, Num). 737:- endif.
This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.
754:- meta_predicate protect_pengine( , ). 755 756protect_pengine(Id, Goal) :- 757 term_hash(Id, Hash), 758 LockN is Hash mod 64, 759 atom_concat(pengine_done_, LockN, Lock), 760 with_mutex(Lock, 761 ( pengine_thread(Id, _) 762 -> Goal 763 ; Goal 764 )).
pengine_sandbox
. The example below creates a new application
address_book
and imports the API defined in the module file
adress_book_api.pl
into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
781pengine_application(Application) :- 782 throw(error(context_error(nodirective, 783 pengine_application(Application)), _)). 784 785:- multifile 786 system:term_expansion/2, 787 current_application/1.
795current_pengine_application(Application) :- 796 current_application(Application). 797 798 799% Default settings for all applications 800 801:- setting(thread_pool_size, integer, 100, 802 'Maximum number of pengines this application can run.'). 803:- setting(thread_pool_stacks, list(compound), [], 804 'Maximum stack sizes for pengines this application can run.'). 805:- setting(slave_limit, integer, 3, 806 'Maximum number of slave pengines a master pengine can create.'). 807:- setting(time_limit, number, 300, 808 'Maximum time to wait for output'). 809:- setting(idle_limit, number, 300, 810 'Pengine auto-destroys when idle for this time'). 811:- setting(safe_goal_limit, number, 10, 812 'Maximum time to try proving safety of the goal'). 813:- setting(program_space, integer, 100_000_000, 814 'Maximum memory used by predicates'). 815:- setting(allow_from, list(atom), [*], 816 'IP addresses from which remotes are allowed to connect'). 817:- setting(deny_from, list(atom), [], 818 'IP addresses from which remotes are NOT allowed to connect'). 819:- setting(debug_info, boolean, false, 820 'Keep information to support source-level debugging'). 821 822 823systemterm_expansion((:- pengine_application(Application)), Expanded) :- 824 must_be(atom, Application), 825 ( module_property(Application, file(_)) 826 -> permission_error(create, pengine_application, Application) 827 ; true 828 ), 829 expand_term((:- setting(Application:thread_pool_size, integer, 830 setting(pengines:thread_pool_size), 831 'Maximum number of pengines this \c 832 application can run.')), 833 ThreadPoolSizeSetting), 834 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 835 setting(pengines:thread_pool_stacks), 836 'Maximum stack sizes for pengines \c 837 this application can run.')), 838 ThreadPoolStacksSetting), 839 expand_term((:- setting(Application:slave_limit, integer, 840 setting(pengines:slave_limit), 841 'Maximum number of local slave pengines \c 842 a master pengine can create.')), 843 SlaveLimitSetting), 844 expand_term((:- setting(Application:time_limit, number, 845 setting(pengines:time_limit), 846 'Maximum time to wait for output')), 847 TimeLimitSetting), 848 expand_term((:- setting(Application:idle_limit, number, 849 setting(pengines:idle_limit), 850 'Pengine auto-destroys when idle for this time')), 851 IdleLimitSetting), 852 expand_term((:- setting(Application:safe_goal_limit, number, 853 setting(pengines:safe_goal_limit), 854 'Maximum time to try proving safety of the goal')), 855 SafeGoalLimitSetting), 856 expand_term((:- setting(Application:program_space, integer, 857 setting(pengines:program_space), 858 'Maximum memory used by predicates')), 859 ProgramSpaceSetting), 860 expand_term((:- setting(Application:allow_from, list(atom), 861 setting(pengines:allow_from), 862 'IP addresses from which remotes are allowed \c 863 to connect')), 864 AllowFromSetting), 865 expand_term((:- setting(Application:deny_from, list(atom), 866 setting(pengines:deny_from), 867 'IP addresses from which remotes are NOT \c 868 allowed to connect')), 869 DenyFromSetting), 870 expand_term((:- setting(Application:debug_info, boolean, 871 setting(pengines:debug_info), 872 'Keep information to support source-level \c 873 debugging')), 874 DebugInfoSetting), 875 flatten([ pengines:current_application(Application), 876 ThreadPoolSizeSetting, 877 ThreadPoolStacksSetting, 878 SlaveLimitSetting, 879 TimeLimitSetting, 880 IdleLimitSetting, 881 SafeGoalLimitSetting, 882 ProgramSpaceSetting, 883 AllowFromSetting, 884 DenyFromSetting, 885 DebugInfoSetting 886 ], Expanded). 887 888% Register default application 889 890:- pengine_application(pengine_sandbox).
alias
option when creating the pengine.true
if the pengines is destroyed automatically
after completing the query.debug_info
is present.927pengine_property(Id, Prop) :- 928 nonvar(Id), nonvar(Prop), 929 pengine_property2(Prop, Id), 930 !. 931pengine_property(Id, Prop) :- 932 pengine_property2(Prop, Id). 933 934pengine_property2(self(Id), Id) :- 935 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 936pengine_property2(module(Id), Id) :- 937 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 938pengine_property2(alias(Alias), Id) :- 939 child(Alias, Id), 940 Alias \== Id. 941pengine_property2(thread(Thread), Id) :- 942 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 943 Thread \== 0. 944pengine_property2(remote(Server), Id) :- 945 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 946pengine_property2(application(Application), Id) :- 947 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 948pengine_property2(destroy(Destroy), Id) :- 949 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 950pengine_property2(parent(Parent), Id) :- 951 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 952pengine_property2(source(SourceID, Source), Id) :- 953 pengine_data(Id, source(SourceID, Source)). 954pengine_property2(detached(When), Id) :- 955 pengine_detached(Id, When).
962pengine_output(Term) :-
963 pengine_self(Me),
964 pengine_reply(output(Me, Term)).
console.log(Message)
if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message])
. The debug
topic pengine(debug)
is enabled by default.
979pengine_debug(Format, Args) :- 980 pengine_parent(Queue), 981 pengine_self(Self), 982 catch(safe_goal(format(atom(_), Format, Args)), E, true), 983 ( var(E) 984 -> format(atom(Message), Format, Args) 985 ; message_to_string(E, Message) 986 ), 987 pengine_reply(Queue, debug(Self, Message)). 988 989 990/*================= Local pengine ======================= 991*/
1002local_pengine_create(Options) :-
1003 thread_self(Self),
1004 option(application(Application), Options, pengine_sandbox),
1005 create(Self, Child, Options, local, Application),
1006 option(alias(Name), Options, Child),
1007 assert(child(Name, Child)).
1014:- multifile thread_pool:create_pool/1. 1015 1016thread_poolcreate_pool(Application) :- 1017 current_application(Application), 1018 setting(Application:thread_pool_size, Size), 1019 setting(Application:thread_pool_stacks, Stacks), 1020 thread_pool_create(Application, Size, Stacks).
1030create(Queue, Child, Options, local, Application) :- 1031 !, 1032 pengine_child_id(Child), 1033 create0(Queue, Child, Options, local, Application). 1034create(Queue, Child, Options, URL, Application) :- 1035 pengine_child_id(Child), 1036 catch(create0(Queue, Child, Options, URL, Application), 1037 Error, 1038 create_error(Queue, Child, Error)). 1039 1040pengine_child_id(Child) :- 1041 ( nonvar(Child) 1042 -> true 1043 ; pengine_uuid(Child) 1044 ). 1045 1046create_error(Queue, Child, Error) :- 1047 pengine_reply(Queue, error(Child, Error)). 1048 1049create0(Queue, Child, Options, URL, Application) :- 1050 ( current_application(Application) 1051 -> true 1052 ; existence_error(pengine_application, Application) 1053 ), 1054 ( URL \== http % pengine is _not_ a child of the 1055 % HTTP server thread 1056 -> aggregate_all(count, child(_,_), Count), 1057 setting(Application:slave_limit, Max), 1058 ( Count >= Max 1059 -> throw(error(resource_error(max_pengines), _)) 1060 ; true 1061 ) 1062 ; true 1063 ), 1064 partition(pengine_create_option, Options, PengineOptions, RestOptions), 1065 thread_create_in_pool( 1066 Application, 1067 pengine_main(Queue, PengineOptions, Application), ChildThread, 1068 [ at_exit(pengine_done) 1069 | RestOptions 1070 ]), 1071 option(destroy(Destroy), PengineOptions, true), 1072 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 1073 thread_send_message(ChildThread, pengine_registered(Child)), 1074 ( option(id(Id), Options) 1075 -> Id = Child 1076 ; true 1077 ). 1078 1079pengine_create_option(src_text(_)). 1080pengine_create_option(src_url(_)). 1081pengine_create_option(application(_)). 1082pengine_create_option(destroy(_)). 1083pengine_create_option(ask(_)). 1084pengine_create_option(template(_)). 1085pengine_create_option(bindings(_)). 1086pengine_create_option(chunk(_)). 1087pengine_create_option(alias(_)). 1088pengine_create_option(user(_)).
at_exit
option. Destroys child
pengines using pengine_destroy/1. Cleaning up the Pengine is
synchronised by the pengine_done
mutex. See read_event/6.1097:- public 1098 pengine_done/0. 1099 1100pengine_done :- 1101 thread_self(Me), 1102 ( thread_property(Me, status(exception('$aborted'))), 1103 thread_detach(Me), 1104 pengine_self(Pengine) 1105 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1106 error(_,_), true) 1107 ; true 1108 ), 1109 forall(child(_Name, Child), 1110 pengine_destroy(Child)), 1111 pengine_self(Id), 1112 protect_pengine(Id, pengine_unregister(Id)).
1120:- thread_local wrap_first_answer_in_create_event/2. 1121 1122:- meta_predicate 1123 pengine_prepare_source( , ). 1124 1125pengine_main(Parent, Options, Application) :- 1126 fix_streams, 1127 thread_get_message(pengine_registered(Self)), 1128 nb_setval(pengine_parent, Parent), 1129 pengine_register_user(Options), 1130 set_prolog_flag(mitigate_spectre, true), 1131 catch(in_temporary_module( 1132 Self, 1133 pengine_prepare_source(Application, Options), 1134 pengine_create_and_loop(Self, Application, Options)), 1135 prepare_source_failed, 1136 pengine_terminate(Self)). 1137 1138pengine_create_and_loop(Self, Application, Options) :- 1139 setting(Application:slave_limit, SlaveLimit), 1140 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1141 ( option(ask(Query0), Options) 1142 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1143 ( string(Query0) % string is not callable 1144 -> ( option(template(TemplateS), Options) 1145 -> Ask2 = Query0-TemplateS 1146 ; Ask2 = Query0 1147 ), 1148 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1149 Error, true), 1150 ( var(Error) 1151 -> true 1152 ; send_error(Error), 1153 throw(prepare_source_failed) 1154 ) 1155 ; Query = Query0, 1156 option(template(Template), Options, Query), 1157 option(bindings(Bindings), Options, []) 1158 ), 1159 option(chunk(Chunk), Options, 1), 1160 pengine_ask(Self, Query, 1161 [ template(Template), 1162 chunk(Chunk), 1163 bindings(Bindings) 1164 ]) 1165 ; Extra = [], 1166 pengine_reply(CreateEvent) 1167 ), 1168 pengine_main_loop(Self).
1178ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1179 !, 1180 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1181 term_string(t(Template1,Ask1), AskTemplate, 1182 [ variable_names(Bindings0), 1183 module(Module) 1184 ]), 1185 phrase(template_bindings(Template1, Bindings0), Bindings). 1186ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1187 term_string(Ask1, Ask, 1188 [ variable_names(Bindings), 1189 module(Module) 1190 ]), 1191 exclude(anon, Bindings, Bindings1), 1192 dict_create(Template, swish_default_template, Bindings1). 1193 1194template_bindings(Var, Bindings) --> 1195 { var(Var) }, !, 1196 ( { var_binding(Bindings, Var, Binding) 1197 } 1198 -> [Binding] 1199 ; [] 1200 ). 1201template_bindings([H|T], Bindings) --> 1202 !, 1203 template_bindings(H, Bindings), 1204 template_bindings(T, Bindings). 1205template_bindings(Compoound, Bindings) --> 1206 { compound(Compoound), !, 1207 compound_name_arguments(Compoound, _, Args) 1208 }, 1209 template_bindings(Args, Bindings). 1210template_bindings(_, _) --> []. 1211 1212var_binding(Bindings, Var, Binding) :- 1213 member(Binding, Bindings), 1214 arg(2, Binding, V), 1215 V == Var, !.
1222fix_streams :- 1223 fix_stream(current_output). 1224 1225fix_stream(Name) :- 1226 is_cgi_stream(Name), 1227 !, 1228 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1229 set_stream(user_output, alias(Name)). 1230fix_stream(_).
1239pengine_prepare_source(Module:Application, Options) :- 1240 setting(Application:program_space, SpaceLimit), 1241 set_module(Module:program_space(SpaceLimit)), 1242 delete_import_module(Module, user), 1243 add_import_module(Module, Application, start), 1244 catch(prep_module(Module, Application, Options), Error, true), 1245 ( var(Error) 1246 -> true 1247 ; send_error(Error), 1248 throw(prepare_source_failed) 1249 ). 1250 1251prep_module(Module, Application, Options) :- 1252 maplist(copy_flag(Module, Application), [var_prefix]), 1253 forall(prepare_module(Module, Application, Options), true), 1254 setup_call_cleanup( 1255 '$set_source_module'(OldModule, Module), 1256 maplist(process_create_option(Module), Options), 1257 '$set_source_module'(OldModule)). 1258 1259copy_flag(Module, Application, Flag) :- 1260 current_prolog_flag(ApplicationFlag, Value), 1261 !, 1262 set_prolog_flag(ModuleFlag, Value). 1263copy_flag(_, _, _). 1264 1265process_create_option(Application, src_text(Text)) :- 1266 !, 1267 pengine_src_text(Text, Application). 1268process_create_option(Application, src_url(URL)) :- 1269 !, 1270 pengine_src_url(URL, Application). 1271process_create_option(_, _).
src_text
and
src_url
options1294pengine_main_loop(ID) :- 1295 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1296 1297pengine_aborted(ID) :- 1298 thread_self(Self), 1299 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1300 empty_queue, 1301 destroy_or_continue(abort(ID)).
1314guarded_main_loop(ID) :- 1315 pengine_request(Request), 1316 ( Request = destroy 1317 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1318 pengine_terminate(ID) 1319 ; Request = ask(Goal, Options) 1320 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1321 ask(ID, Goal, Options) 1322 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1323 pengine_reply(error(ID, error(protocol_error, _))), 1324 guarded_main_loop(ID) 1325 ). 1326 1327 1328pengine_terminate(ID) :- 1329 pengine_reply(destroy(ID)), 1330 thread_self(Me), % Make the thread silently disappear 1331 thread_detach(Me).
1342solve(Chunk, Template, Goal, ID) :- 1343 prolog_current_choice(Choice), 1344 ( integer(Chunk) 1345 -> State = count(Chunk) 1346 ; Chunk == false 1347 -> State = no_chunk 1348 ; domain_error(chunk, Chunk) 1349 ), 1350 statistics(cputime, Epoch), 1351 Time = time(Epoch), 1352 nb_current('$variable_names', Bindings), 1353 filter_template(Template, Bindings, Template2), 1354 '$current_typein_module'(CurrTypeIn), 1355 ( '$set_typein_module'(ID), 1356 call_cleanup(catch(findnsols_no_empty(State, Template2, 1357 set_projection(Goal, Bindings), 1358 Result), 1359 Error, true), 1360 query_done(Det, CurrTypeIn)), 1361 arg(1, Time, T0), 1362 statistics(cputime, T1), 1363 CPUTime is T1-T0, 1364 ( var(Error) 1365 -> projection(Projection), 1366 ( var(Det) 1367 -> pengine_reply(success(ID, Result, Projection, 1368 CPUTime, true)), 1369 more_solutions(ID, Choice, State, Time) 1370 ; !, % commit 1371 destroy_or_continue(success(ID, Result, Projection, 1372 CPUTime, false)) 1373 ) 1374 ; !, % commit 1375 ( Error == abort_query 1376 -> throw(Error) 1377 ; destroy_or_continue(error(ID, Error)) 1378 ) 1379 ) 1380 ; !, % commit 1381 arg(1, Time, T0), 1382 statistics(cputime, T1), 1383 CPUTime is T1-T0, 1384 destroy_or_continue(failure(ID, CPUTime)) 1385 ). 1386solve(_, _, _, _). % leave a choice point 1387 1388query_done(true, CurrTypeIn) :- 1389 '$set_typein_module'(CurrTypeIn).
1398set_projection(Goal, Bindings) :- 1399 b_setval('$variable_names', Bindings), 1400 call(Goal). 1401 1402projection(Projection) :- 1403 nb_current('$variable_names', Bindings), 1404 !, 1405 maplist(var_name, Bindings, Projection). 1406projection([]).
1416filter_template(Template0, Bindings, Template) :- 1417 is_dict(Template0, swish_default_template), 1418 !, 1419 dict_create(Template, swish_default_template, Bindings). 1420filter_template(Template, _Bindings, Template). 1421 1422findnsols_no_empty(no_chunk, Template, Goal, List) => 1423 List = [Template], 1424 call(Goal). 1425findnsols_no_empty(State, Template, Goal, List) => 1426 findnsols(State, Template, Goal, List), 1427 List \== []. 1428 1429destroy_or_continue(Event) :- 1430 arg(1, Event, ID), 1431 ( pengine_property(ID, destroy(true)) 1432 -> thread_self(Me), 1433 thread_detach(Me), 1434 pengine_reply(destroy(ID, Event)) 1435 ; pengine_reply(Event), 1436 guarded_main_loop(ID) 1437 ).
chunk
solutions.next
, but sets the new chunk-size to Count.1455more_solutions(ID, Choice, State, Time) :- 1456 pengine_request(Event), 1457 more_solutions(Event, ID, Choice, State, Time). 1458 1459more_solutions(stop, ID, _Choice, _State, _Time) :- 1460 !, 1461 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1462 destroy_or_continue(stop(ID)). 1463more_solutions(next, ID, _Choice, _State, Time) :- 1464 !, 1465 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1466 statistics(cputime, T0), 1467 nb_setarg(1, Time, T0), 1468 fail. 1469more_solutions(next(Count), ID, _Choice, State, Time) :- 1470 Count > 0, 1471 State = count(_), % else fallthrough to protocol error 1472 !, 1473 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1474 nb_setarg(1, State, Count), 1475 statistics(cputime, T0), 1476 nb_setarg(1, Time, T0), 1477 fail. 1478more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1479 !, 1480 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1481 prolog_cut_to(Choice), 1482 ask(ID, Goal, Options). 1483more_solutions(destroy, ID, _Choice, _State, _Time) :- 1484 !, 1485 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1486 pengine_terminate(ID). 1487more_solutions(Event, ID, Choice, State, Time) :- 1488 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1489 pengine_reply(error(ID, error(protocol_error, _))), 1490 more_solutions(ID, Choice, State, Time).
chunk(N)
option.
1498ask(ID, Goal, Options) :-
1499 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1500 !,
1501 ( var(Error)
1502 -> option(template(Template), Options, Goal),
1503 option(chunk(N), Options, 1),
1504 solve(N, Template, Goal1, ID)
1505 ; pengine_reply(error(ID, Error)),
1506 guarded_main_loop(ID)
1507 ).
Note that expand_goal(Module:GoalIn, GoalOut)
is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y
while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1521prepare_goal(ID, Goal0, Module:Goal, Options) :-
1522 option(bindings(Bindings), Options, []),
1523 b_setval('$variable_names', Bindings),
1524 ( prepare_goal(Goal0, Goal1, Options)
1525 -> true
1526 ; Goal1 = Goal0
1527 ),
1528 get_pengine_module(ID, Module),
1529 setup_call_cleanup(
1530 '$set_source_module'(Old, Module),
1531 expand_goal(Goal1, Goal),
1532 '$set_source_module'(_, Old)),
1533 ( pengine_not_sandboxed(ID)
1534 -> true
1535 ; get_pengine_application(ID, App),
1536 setting(App:safe_goal_limit, Limit),
1537 catch(call_with_time_limit(
1538 Limit,
1539 safe_goal(Module:Goal)), E, true)
1540 -> ( var(E)
1541 -> true
1542 ; E = time_limit_exceeded
1543 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1544 ; throw(E)
1545 )
1546 ).
not_sandboxed(User, Application)
must succeed.
1566pengine_not_sandboxed(ID) :-
1567 pengine_user(ID, User),
1568 pengine_property(ID, application(App)),
1569 not_sandboxed(User, App),
1570 !.
1592pengine_pull_response(Pengine, Options) :- 1593 pengine_remote(Pengine, Server), 1594 !, 1595 remote_pengine_pull_response(Server, Pengine, Options). 1596pengine_pull_response(_ID, _Options).
1605pengine_input(Prompt, Term) :-
1606 pengine_self(Self),
1607 pengine_parent(Parent),
1608 pengine_reply(Parent, prompt(Self, Prompt)),
1609 pengine_request(Request),
1610 ( Request = input(Input)
1611 -> Term = Input
1612 ; Request == destroy
1613 -> abort
1614 ; throw(error(protocol_error,_))
1615 ).
Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :- pengine_send(Pengine, input(Input), Options).
*/
1632pengine_respond(Pengine, Input, Options) :-
1633 pengine_send(Pengine, input(Input), Options).
1642send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1643 is_list(Frames), 1644 !, 1645 with_output_to(string(Stack), 1646 print_prolog_backtrace(current_output, Frames)), 1647 pengine_self(Self), 1648 replace_blobs(Formal, Formal1), 1649 replace_blobs(Message, Message1), 1650 pengine_reply(error(Self, error(Formal1, 1651 context(prolog_stack(Stack), Message1)))). 1652send_error(Error) :- 1653 pengine_self(Self), 1654 replace_blobs(Error, Error1), 1655 pengine_reply(error(Self, Error1)).
1663replace_blobs(Blob, Atom) :- 1664 blob(Blob, Type), Type \== text, 1665 !, 1666 format(atom(Atom), '~p', [Blob]). 1667replace_blobs(Term0, Term) :- 1668 compound(Term0), 1669 !, 1670 compound_name_arguments(Term0, Name, Args0), 1671 maplist(replace_blobs, Args0, Args), 1672 compound_name_arguments(Term, Name, Args). 1673replace_blobs(Term, Term). 1674 1675 1676/*================= Remote pengines ======================= 1677*/ 1678 1679 1680remote_pengine_create(BaseURL, Options) :- 1681 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1682 ( option(ask(Query), PengineOptions0), 1683 \+ option(template(_Template), PengineOptions0) 1684 -> PengineOptions = [template(Query)|PengineOptions0] 1685 ; PengineOptions = PengineOptions0 1686 ), 1687 options_to_dict(PengineOptions, PostData), 1688 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1689 arg(1, Reply, ID), 1690 ( option(id(ID2), Options) 1691 -> ID = ID2 1692 ; true 1693 ), 1694 option(alias(Name), Options, ID), 1695 assert(child(Name, ID)), 1696 ( ( functor(Reply, create, _) % actually created 1697 ; functor(Reply, output, _) % compiler messages 1698 ) 1699 -> option(application(Application), PengineOptions, pengine_sandbox), 1700 option(destroy(Destroy), PengineOptions, true), 1701 pengine_register_remote(ID, BaseURL, Application, Destroy) 1702 ; true 1703 ), 1704 thread_self(Queue), 1705 pengine_reply(Queue, Reply). 1706 1707options_to_dict(Options, Dict) :- 1708 select_option(ask(Ask), Options, Options1), 1709 select_option(template(Template), Options1, Options2), 1710 !, 1711 no_numbered_var_in(Ask+Template), 1712 findall(AskString-TemplateString, 1713 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1714 [ AskString-TemplateString ]), 1715 options_to_dict(Options2, Dict0), 1716 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1717options_to_dict(Options, Dict) :- 1718 maplist(prolog_option, Options, Options1), 1719 dict_create(Dict, _, Options1). 1720 1721no_numbered_var_in(Term) :- 1722 sub_term(Sub, Term), 1723 subsumes_term('$VAR'(_), Sub), 1724 !, 1725 domain_error(numbered_vars_free_term, Term). 1726no_numbered_var_in(_). 1727 1728ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1729 numbervars(Ask+Template, 0, _), 1730 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1731 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1732 Template, WOpts 1733 ]), 1734 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1735 1736prolog_option(Option0, Option) :- 1737 create_option_type(Option0, term), 1738 !, 1739 Option0 =.. [Name,Value], 1740 format(string(String), '~k', [Value]), 1741 Option =.. [Name,String]. 1742prolog_option(Option, Option). 1743 1744create_option_type(ask(_), term). 1745create_option_type(template(_), term). 1746create_option_type(application(_), atom). 1747 1748remote_pengine_send(BaseURL, ID, Event, Options) :- 1749 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1750 thread_self(Queue), 1751 pengine_reply(Queue, Reply). 1752 1753remote_pengine_pull_response(BaseURL, ID, Options) :- 1754 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1755 thread_self(Queue), 1756 pengine_reply(Queue, Reply). 1757 1758remote_pengine_abort(BaseURL, ID, Options) :- 1759 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1760 thread_self(Queue), 1761 pengine_reply(Queue, Reply).
1768remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1769 !, 1770 server_url(Server, Action, [id=ID], URL), 1771 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1772 [ post(prolog(Event)) % makes it impossible to interrupt. 1773 | Options 1774 ]), 1775 call_cleanup( 1776 read_prolog_reply(Stream, Reply), 1777 close(Stream)). 1778remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1779 server_url(Server, Action, [id=ID|Params], URL), 1780 http_open(URL, Stream, Options), 1781 call_cleanup( 1782 read_prolog_reply(Stream, Reply), 1783 close(Stream)). 1784 1785remote_post_rec(Server, Action, Data, Reply, Options) :- 1786 server_url(Server, Action, [], URL), 1787 probe(Action, URL), 1788 http_open(URL, Stream, 1789 [ post(json(Data)) 1790 | Options 1791 ]), 1792 call_cleanup( 1793 read_prolog_reply(Stream, Reply), 1794 close(Stream)).
1802probe(create, URL) :- 1803 !, 1804 http_open(URL, Stream, [method(options)]), 1805 close(Stream). 1806probe(_, _). 1807 1808read_prolog_reply(In, Reply) :- 1809 set_stream(In, encoding(utf8)), 1810 read(In, Reply0), 1811 rebind_cycles(Reply0, Reply). 1812 1813rebind_cycles(@(Reply, Bindings), Reply) :- 1814 is_list(Bindings), 1815 !, 1816 maplist(bind, Bindings). 1817rebind_cycles(Reply, Reply). 1818 1819bind(Var = Value) :- 1820 Var = Value. 1821 1822server_url(Server, Action, Params, URL) :- 1823 uri_components(Server, Components0), 1824 uri_query_components(Query, Params), 1825 uri_data(path, Components0, Path0), 1826 atom_concat('pengine/', Action, PAction), 1827 directory_file_path(Path0, PAction, Path), 1828 uri_data(path, Components0, Path, Components), 1829 uri_data(search, Components, Query), 1830 uri_components(URL, Components).
Valid options are:
timeout
.1851pengine_event(Event) :- 1852 pengine_event(Event, []). 1853 1854pengine_event(Event, Options) :- 1855 thread_self(Self), 1856 option(listen(Id), Options, _), 1857 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1858 -> true 1859 ; Event = timeout 1860 ), 1861 update_remote_destroy(Event). 1862 1863update_remote_destroy(Event) :- 1864 destroy_event(Event), 1865 arg(1, Event, Id), 1866 pengine_remote(Id, _Server), 1867 !, 1868 pengine_unregister_remote(Id). 1869update_remote_destroy(_). 1870 1871destroy_event(destroy(_)). 1872destroy_event(destroy(_,_)). 1873destroy_event(create(_,Features)) :- 1874 memberchk(answer(Answer), Features), 1875 !, 1876 nonvar(Answer), 1877 destroy_event(Answer).
ignore(call(Closure, E))
. A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all
,
all_but_sender
or a Prolog list of NameOrIDs. [not yet
implemented]*/
1906pengine_event_loop(Closure, Options) :- 1907 child(_,_), 1908 !, 1909 pengine_event(Event), 1910 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1911 -> forall(child(_,ID), pengine_send(ID, Event)) 1912 ; true 1913 ), 1914 pengine_event_loop(Event, Closure, Options). 1915pengine_event_loop(_, _). 1916 1917:- meta_predicate 1918 pengine_process_event( , , , ). 1919 1920pengine_event_loop(Event, Closure, Options) :- 1921 pengine_process_event(Event, Closure, Continue, Options), 1922 ( Continue == true 1923 -> pengine_event_loop(Closure, Options) 1924 ; true 1925 ). 1926 1927pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1928 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1929 ( select(answer(First), T, T1) 1930 -> ignore(call(Closure, create(ID, T1))), 1931 pengine_process_event(First, Closure, Continue, Options) 1932 ; ignore(call(Closure, create(ID, T))), 1933 Continue = true 1934 ). 1935pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1936 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1937 ignore(call(Closure, output(ID, Msg))), 1938 pengine_pull_response(ID, []). 1939pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1940 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1941 ignore(call(Closure, debug(ID, Msg))), 1942 pengine_pull_response(ID, []). 1943pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1944 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1945 ignore(call(Closure, prompt(ID, Term))). 1946pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1947 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1948 ignore(call(Closure, success(ID, Sol, More))). 1949pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1950 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1951 ignore(call(Closure, failure(ID))). 1952pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1953 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1954 ( call(Closure, error(ID, Error)) 1955 -> Continue = true 1956 ; forall(child(_,Child), pengine_destroy(Child)), 1957 throw(Error) 1958 ). 1959pengine_process_event(stop(ID), Closure, true, _Options) :- 1960 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1961 ignore(call(Closure, stop(ID))). 1962pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1963 pengine_process_event(Event, Closure, _, Options), 1964 pengine_process_event(destroy(ID), Closure, Continue, Options). 1965pengine_process_event(destroy(ID), Closure, true, _Options) :- 1966 retractall(child(_,ID)), 1967 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1968 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit
.Remaining options (except the server option) are passed to pengine_create/1. */
1997pengine_rpc(URL, Query) :- 1998 pengine_rpc(URL, Query, []). 1999 2000pengine_rpc(URL, Query, M:Options0) :- 2001 translate_local_sources(Options0, Options1, M), 2002 ( option(timeout(_), Options1) 2003 -> Options = Options1 2004 ; setting(time_limit, Limit), 2005 Options = [timeout(Limit)|Options1] 2006 ), 2007 term_variables(Query, Vars), 2008 Template =.. [v|Vars], 2009 State = destroy(true), % modified by process_event/4 2010 setup_call_catcher_cleanup( 2011 pengine_create([ ask(Query), 2012 template(Template), 2013 server(URL), 2014 id(Id) 2015 | Options 2016 ]), 2017 wait_event(Template, State, [listen(Id)|Options]), 2018 Why, 2019 pengine_destroy_and_wait(State, Id, Why)). 2020 2021pengine_destroy_and_wait(destroy(true), Id, Why) :- 2022 !, 2023 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 2024 pengine_destroy(Id), 2025 wait_destroy(Id, 10). 2026pengine_destroy_and_wait(_, _, Why) :- 2027 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 2028 2029wait_destroy(Id, _) :- 2030 \+ child(_, Id), 2031 !. 2032wait_destroy(Id, N) :- 2033 pengine_event(Event, [listen(Id),timeout(10)]), 2034 !, 2035 ( destroy_event(Event) 2036 -> retractall(child(_,Id)) 2037 ; succ(N1, N) 2038 -> wait_destroy(Id, N1) 2039 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 2040 pengine_unregister_remote(Id), 2041 retractall(child(_,Id)) 2042 ). 2043 2044wait_event(Template, State, Options) :- 2045 pengine_event(Event, Options), 2046 debug(pengine(event), 'Received ~p', [Event]), 2047 process_event(Event, Template, State, Options). 2048 2049process_event(create(_ID, Features), Template, State, Options) :- 2050 memberchk(answer(First), Features), 2051 process_event(First, Template, State, Options). 2052process_event(error(_ID, Error), _Template, _, _Options) :- 2053 throw(Error). 2054process_event(failure(_ID, _Time), _Template, _, _Options) :- 2055 fail. 2056process_event(prompt(ID, Prompt), Template, State, Options) :- 2057 pengine_rpc_prompt(ID, Prompt, Reply), 2058 pengine_send(ID, input(Reply)), 2059 wait_event(Template, State, Options). 2060process_event(output(ID, Term), Template, State, Options) :- 2061 pengine_rpc_output(ID, Term), 2062 pengine_pull_response(ID, Options), 2063 wait_event(Template, State, Options). 2064process_event(debug(ID, Message), Template, State, Options) :- 2065 debug(pengine(debug), '~w', [Message]), 2066 pengine_pull_response(ID, Options), 2067 wait_event(Template, State, Options). 2068process_event(success(_ID, Solutions, _Proj, _Time, false), 2069 Template, _, _Options) :- 2070 !, 2071 member(Template, Solutions). 2072process_event(success(ID, Solutions, _Proj, _Time, true), 2073 Template, State, Options) :- 2074 ( member(Template, Solutions) 2075 ; pengine_next(ID, Options), 2076 wait_event(Template, State, Options) 2077 ). 2078process_event(destroy(ID, Event), Template, State, Options) :- 2079 !, 2080 retractall(child(_,ID)), 2081 nb_setarg(1, State, false), 2082 debug(pengine(destroy), 'State: ~p~n', [State]), 2083 process_event(Event, Template, State, Options). 2084% compatibility with older versions of the protocol. 2085process_event(success(ID, Solutions, Time, More), 2086 Template, State, Options) :- 2087 process_event(success(ID, Solutions, _Proj, Time, More), 2088 Template, State, Options). 2089 2090 2091pengine_rpc_prompt(ID, Prompt, Term) :- 2092 prompt(ID, Prompt, Term0), 2093 !, 2094 Term = Term0. 2095pengine_rpc_prompt(_ID, Prompt, Term) :- 2096 setup_call_cleanup( 2097 prompt(Old, Prompt), 2098 read(Term), 2099 prompt(_, Old)). 2100 2101pengine_rpc_output(ID, Term) :- 2102 output(ID, Term), 2103 !. 2104pengine_rpc_output(_ID, Term) :- 2105 print(Term).
2112:- multifile prompt/3.
2119:- multifile output/2. 2120 2121 2122/*================= HTTP handlers ======================= 2123*/ 2124 2125% Declare HTTP locations we serve and how. Note that we use 2126% time_limit(inifinite) because pengines have their own timeout. Also 2127% note that we use spawn. This is needed because we can easily get 2128% many clients waiting for some action on a pengine to complete. 2129% Without spawning, we would quickly exhaust the worker pool of the 2130% HTTP server. 2131% 2132% FIXME: probably we should wait for a short time for the pengine on 2133% the default worker thread. Only if that time has expired, we can 2134% call http_spawn/2 to continue waiting on a new thread. That would 2135% improve the performance and reduce the usage of threads. 2136 2137:- http_handler(root(pengine), http_404([]), 2138 [ id(pengines) ]). 2139:- http_handler(root(pengine/create), http_pengine_create, 2140 [ time_limit(infinite), spawn([]) ]). 2141:- http_handler(root(pengine/send), http_pengine_send, 2142 [ time_limit(infinite), spawn([]) ]). 2143:- http_handler(root(pengine/pull_response), http_pengine_pull_response, 2144 [ time_limit(infinite), spawn([]) ]). 2145:- http_handler(root(pengine/abort), http_pengine_abort, []). 2146:- http_handler(root(pengine/detach), http_pengine_detach, []). 2147:- http_handler(root(pengine/list), http_pengine_list, []). 2148:- http_handler(root(pengine/ping), http_pengine_ping, []). 2149:- http_handler(root(pengine/destroy_all), http_pengine_destroy_all, []). 2150 2151:- http_handler(root(pengine/'pengines.js'), 2152 http_reply_file(library('http/web/js/pengines.js'), []), []). 2153:- http_handler(root(pengine/'plterm.css'), 2154 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json
and as
www-form-encoded
. Accepted parameters:
Parameter | Default | Comment |
---|---|---|
format | prolog | Output format |
application | pengine_sandbox | Pengine application |
chunk | 1 | Chunk-size for results |
solutions | chunked | If all , emit all results |
ask | - | The query |
template | - | Output template |
src_text | "" | Program |
src_url | - | Program to download |
disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
Using chunk=false
simulates the recursive toplevel. See
pengine_ask/3.
2184http_pengine_create(Request) :- 2185 reply_options(Request, [post]), 2186 !. 2187http_pengine_create(Request) :- 2188 memberchk(content_type(CT), Request), 2189 sub_atom(CT, 0, _, _, 'application/json'), 2190 !, 2191 http_read_json_dict(Request, Dict), 2192 dict_atom_option(format, Dict, Format, prolog), 2193 dict_atom_option(application, Dict, Application, pengine_sandbox), 2194 http_pengine_create(Request, Application, Format, Dict). 2195http_pengine_create(Request) :- 2196 Optional = [optional(true)], 2197 OptString = [string|Optional], 2198 Form = [ format(Format, [default(prolog)]), 2199 application(Application, [default(pengine_sandbox)]), 2200 chunk(_, [nonneg;oneof([false]), default(1)]), 2201 solutions(_, [oneof([all,chunked]), default(chunked)]), 2202 ask(_, OptString), 2203 template(_, OptString), 2204 src_text(_, OptString), 2205 disposition(_, OptString), 2206 src_url(_, Optional) 2207 ], 2208 http_parameters(Request, Form), 2209 form_dict(Form, Dict), 2210 http_pengine_create(Request, Application, Format, Dict). 2211 2212dict_atom_option(Key, Dict, Atom, Default) :- 2213 ( get_dict(Key, Dict, String) 2214 -> atom_string(Atom, String) 2215 ; Atom = Default 2216 ). 2217 2218form_dict(Form, Dict) :- 2219 form_values(Form, Pairs), 2220 dict_pairs(Dict, _, Pairs). 2221 2222form_values([], []). 2223form_values([H|T], Pairs) :- 2224 arg(1, H, Value), 2225 nonvar(Value), 2226 !, 2227 functor(H, Name, _), 2228 Pairs = [Name-Value|PairsT], 2229 form_values(T, PairsT). 2230form_values([_|T], Pairs) :- 2231 form_values(T, Pairs).
2236http_pengine_create(Request, Application, Format, Dict) :- 2237 current_application(Application), 2238 !, 2239 allowed(Request, Application), 2240 authenticate(Request, Application, UserOptions), 2241 dict_to_options(Dict, Application, CreateOptions0), 2242 append(UserOptions, CreateOptions0, CreateOptions), 2243 pengine_uuid(Pengine), 2244 message_queue_create(Queue, [max_size(25)]), 2245 setting(Application:time_limit, TimeLimit), 2246 get_time(Now), 2247 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2248 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2249 create(Queue, Pengine, CreateOptions, http, Application), 2250 create_wait_and_output_result(Pengine, Queue, Format, 2251 TimeLimit, Dict), 2252 gc_abandoned_queues. 2253http_pengine_create(_Request, Application, Format, _Dict) :- 2254 Error = existence_error(pengine_application, Application), 2255 pengine_uuid(ID), 2256 output_result(Format, error(ID, error(Error, _))). 2257 2258 2259dict_to_options(Dict, Application, CreateOptions) :- 2260 dict_pairs(Dict, _, Pairs), 2261 pairs_create_options(Pairs, Application, CreateOptions). 2262 2263pairs_create_options([], _, []) :- !. 2264pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2265 Opt =.. [N,V], 2266 pengine_create_option(Opt), N \== user, 2267 !, 2268 ( create_option_type(Opt, atom) 2269 -> atom_string(V, V0) % term creation must be done if 2270 ; V = V0 % we created the source and know 2271 ), % the operators. 2272 pairs_create_options(T0, App, T). 2273pairs_create_options([_|T0], App, T) :- 2274 pairs_create_options(T0, App, T).
time_limit
,
Pengine is aborted and the result is error(time_limit_exceeded,
_)
.
2285wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
2286 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2287 [ timeout(TimeLimit)
2288 ]),
2289 Error, true)
2290 -> ( var(Error)
2291 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2292 ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2293 protect_pengine(Pengine, output_result(Format, Event))
2294 ; output_result(Format, died(Pengine))
2295 )
2296 ; time_limit_exceeded(Pengine, Format)
2297 ).
disposition
key to denote the
download location.2306create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2307 get_dict(solutions, Dict, all), 2308 !, 2309 between(1, infinite, Page), 2310 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2311 [ timeout(TimeLimit) 2312 ]), 2313 Error, true) 2314 -> ( var(Error) 2315 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2316 ( destroy_queue_from_http(Pengine, Event, Queue) 2317 -> !, 2318 protect_pengine(Pengine, 2319 output_result(Format, page(Page, Event), Dict)) 2320 ; is_more_event(Event) 2321 -> pengine_thread(Pengine, Thread), 2322 thread_send_message(Thread, pengine_request(next)), 2323 protect_pengine(Pengine, 2324 output_result(Format, page(Page, Event), Dict)), 2325 fail 2326 ; !, 2327 protect_pengine(Pengine, 2328 output_result(Format, page(Page, Event), Dict)) 2329 ) 2330 ; !, output_result(Format, died(Pengine)) 2331 ) 2332 ; !, time_limit_exceeded(Pengine, Format) 2333 ), 2334 !. 2335create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :- 2336 wait_and_output_result(Pengine, Queue, Format, TimeLimit). 2337 2338is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2339is_more_event(create(_, Options)) :- 2340 memberchk(answer(Event), Options), 2341 is_more_event(Event).
2355time_limit_exceeded(Pengine, Format) :-
2356 call_cleanup(
2357 pengine_destroy(Pengine, [force(true)]),
2358 output_result(Format,
2359 destroy(Pengine,
2360 error(Pengine, time_limit_exceeded)))).
2375destroy_queue_from_http(ID, _, Queue) :- 2376 output_queue(ID, Queue, _), 2377 !, 2378 destroy_queue_if_empty(Queue). 2379destroy_queue_from_http(ID, Event, Queue) :- 2380 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2381 is_destroy_event(Event), 2382 !, 2383 message_queue_property(Queue, size(Waiting)), 2384 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2385 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2386 2387is_destroy_event(destroy(_)). 2388is_destroy_event(destroy(_,_)). 2389is_destroy_event(create(_, Options)) :- 2390 memberchk(answer(Event), Options), 2391 is_destroy_event(Event). 2392 2393destroy_queue_if_empty(Queue) :- 2394 thread_peek_message(Queue, _), 2395 !. 2396destroy_queue_if_empty(Queue) :- 2397 retractall(output_queue(_, Queue, _)), 2398 message_queue_destroy(Queue).
2406:- dynamic 2407 last_gc/1. 2408 2409gc_abandoned_queues :- 2410 consider_queue_gc, 2411 !, 2412 get_time(Now), 2413 ( output_queue(_, Queue, Time), 2414 Now-Time > 15*60, 2415 retract(output_queue(_, Queue, Time)), 2416 message_queue_destroy(Queue), 2417 fail 2418 ; retractall(last_gc(_)), 2419 asserta(last_gc(Now)) 2420 ). 2421gc_abandoned_queues. 2422 2423consider_queue_gc :- 2424 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2425 N > 100, 2426 ( last_gc(Time), 2427 get_time(Now), 2428 Now-Time > 5*60 2429 -> true 2430 ; \+ last_gc(_) 2431 ).
2449:- dynamic output_queue_destroyed/1. 2450 2451sync_destroy_queue_from_http(ID, Queue) :- 2452 ( output_queue(ID, Queue, _) 2453 -> destroy_queue_if_empty(Queue) 2454 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2455 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2456 [Queue]), 2457 get_time(Now), 2458 asserta(output_queue(ID, Queue, Now)) 2459 ; message_queue_destroy(Queue), 2460 asserta(output_queue_destroyed(Queue)) 2461 ).
pengine
held.2468sync_destroy_queue_from_pengine(ID, Queue) :- 2469 ( retract(output_queue_destroyed(Queue)) 2470 -> true 2471 ; get_time(Now), 2472 asserta(output_queue(ID, Queue, Now)) 2473 ), 2474 retractall(pengine_queue(ID, Queue, _, _)). 2475 2476 2477http_pengine_send(Request) :- 2478 reply_options(Request, [get,post]), 2479 !. 2480http_pengine_send(Request) :- 2481 http_parameters(Request, 2482 [ id(ID, [ type(atom) ]), 2483 event(EventString, [optional(true)]), 2484 format(Format, [default(prolog)]) 2485 ]), 2486 catch(read_event(ID, Request, Format, EventString, Event), 2487 Error, 2488 true), 2489 ( var(Error) 2490 -> debug(pengine(event), 'HTTP send: ~p', [Event]), 2491 ( pengine_thread(ID, Thread) 2492 -> pengine_queue(ID, Queue, TimeLimit, _), 2493 random_delay, 2494 broadcast(pengine(send(ID, Event))), 2495 thread_send_message(Thread, pengine_request(Event)), 2496 wait_and_output_result(ID, Queue, Format, TimeLimit) 2497 ; atom(ID) 2498 -> pengine_died(Format, ID) 2499 ; http_404([], Request) 2500 ) 2501 ; Error = error(existence_error(pengine, ID), _) 2502 -> pengine_died(Format, ID) 2503 ; output_result(Format, error(ID, Error)) 2504 ). 2505 2506pengine_died(Format, Pengine) :- 2507 output_result(Format, error(Pengine, 2508 error(existence_error(pengine, Pengine),_))).
pengine_done
mutex.
2519read_event(Pengine, Request, Format, EventString, Event) :- 2520 protect_pengine( 2521 Pengine, 2522 ( get_pengine_module(Pengine, Module), 2523 read_event_2(Request, EventString, Module, Event0, Bindings) 2524 )), 2525 !, 2526 fix_bindings(Format, Event0, Bindings, Event). 2527read_event(Pengine, Request, _Format, _EventString, _Event) :- 2528 debug(pengine(event), 'Pengine ~q vanished', [Pengine]), 2529 discard_post_data(Request), 2530 existence_error(pengine, Pengine).
event
parameter or as a posted document.2538read_event_2(_Request, EventString, Module, Event, Bindings) :- 2539 nonvar(EventString), 2540 !, 2541 term_string(Event, EventString, 2542 [ variable_names(Bindings), 2543 module(Module) 2544 ]). 2545read_event_2(Request, _EventString, Module, Event, Bindings) :- 2546 option(method(post), Request), 2547 http_read_data(Request, Event, 2548 [ content_type('application/x-prolog'), 2549 module(Module), 2550 variable_names(Bindings) 2551 ]).
2557discard_post_data(Request) :- 2558 option(method(post), Request), 2559 !, 2560 setup_call_cleanup( 2561 open_null_stream(NULL), 2562 http_read_data(Request, _, [to(stream(NULL))]), 2563 close(NULL)). 2564discard_post_data(_).
json(-s)
Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2572fix_bindings(Format, 2573 ask(Goal, Options0), Bindings, 2574 ask(Goal, NewOptions)) :- 2575 json_lang(Format), 2576 !, 2577 exclude(anon, Bindings, NamedBindings), 2578 template(NamedBindings, Template, Options0, Options1), 2579 select_option(chunk(Paging), Options1, Options2, 1), 2580 NewOptions = [ template(Template), 2581 chunk(Paging), 2582 bindings(NamedBindings) 2583 | Options2 2584 ]. 2585fix_bindings(_, Command, _, Command). 2586 2587template(_, Template, Options0, Options) :- 2588 select_option(template(Template), Options0, Options), 2589 !. 2590template(Bindings, Template, Options, Options) :- 2591 dict_create(Template, swish_default_template, Bindings). 2592 2593anon(Name=_) :- 2594 sub_atom(Name, 0, _, _, '_'), 2595 sub_atom(Name, 1, 1, _, Next), 2596 char_type(Next, prolog_var_start). 2597 2598var_name(Name=_, Name).
2605json_lang(json) :- !. 2606json_lang(Format) :- 2607 sub_atom(Format, 0, _, _, 'json-').
2614http_pengine_pull_response(Request) :- 2615 reply_options(Request, [get]), 2616 !. 2617http_pengine_pull_response(Request) :- 2618 http_parameters(Request, 2619 [ id(ID, []), 2620 format(Format, [default(prolog)]) 2621 ]), 2622 reattach(ID), 2623 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2624 -> true 2625 ; output_queue(ID, Queue, _), 2626 TimeLimit = 0 2627 ) 2628 -> wait_and_output_result(ID, Queue, Format, TimeLimit) 2629 ; http_404([], Request) 2630 ).
2639http_pengine_abort(Request) :- 2640 reply_options(Request, [get,post]), 2641 !. 2642http_pengine_abort(Request) :- 2643 http_parameters(Request, 2644 [ id(ID, []) 2645 ]), 2646 ( pengine_thread(ID, _Thread) 2647 -> broadcast(pengine(abort(ID))), 2648 abort_pending_output(ID), 2649 pengine_abort(ID), 2650 reply_json(true) 2651 ; http_404([], Request) 2652 ).
2664http_pengine_detach(Request) :- 2665 reply_options(Request, [post]), 2666 !. 2667http_pengine_detach(Request) :- 2668 http_parameters(Request, 2669 [ id(ID, []) 2670 ]), 2671 http_read_json_dict(Request, ClientData), 2672 ( pengine_property(ID, application(Application)), 2673 allowed(Request, Application), 2674 authenticate(Request, Application, _UserOptions) 2675 -> broadcast(pengine(detach(ID))), 2676 get_time(Now), 2677 assertz(pengine_detached(ID, ClientData.put(time, Now))), 2678 pengine_queue(ID, Queue, _TimeLimit, _Now), 2679 message_queue_set(Queue, max_size(1000)), 2680 pengine_reply(Queue, detached(ID)), 2681 reply_json(true) 2682 ; http_404([], Request) 2683 ). 2684 2685:- if(\+current_predicate(message_queue_set/2)). 2686message_queue_set(_,_). 2687:- endif. 2688 2689reattach(ID) :- 2690 ( retract(pengine_detached(ID, _Data)), 2691 pengine_queue(ID, Queue, _TimeLimit, _Now) 2692 -> message_queue_set(Queue, max_size(25)) 2693 ; true 2694 ).
2702http_pengine_destroy_all(Request) :- 2703 reply_options(Request, [get,post]), 2704 !. 2705http_pengine_destroy_all(Request) :- 2706 http_parameters(Request, 2707 [ ids(IDsAtom, []) 2708 ]), 2709 atomic_list_concat(IDs, ',', IDsAtom), 2710 forall(( member(ID, IDs), 2711 \+ pengine_detached(ID, _) 2712 ), 2713 pengine_destroy(ID, [force(true)])), 2714 reply_json("ok").
status(Pengine, Stats)
is created, where Stats
is the return of thread_statistics/2.2722http_pengine_ping(Request) :- 2723 reply_options(Request, [get]), 2724 !. 2725http_pengine_ping(Request) :- 2726 http_parameters(Request, 2727 [ id(Pengine, []), 2728 format(Format, [default(prolog)]) 2729 ]), 2730 ( pengine_thread(Pengine, Thread), 2731 Error = error(_,_), 2732 catch(thread_statistics(Thread, Stats), Error, fail) 2733 -> output_result(Format, ping(Pengine, Stats)) 2734 ; output_result(Format, died(Pengine)) 2735 ).
2744http_pengine_list(Request) :- 2745 reply_options(Request, [get]), 2746 !. 2747http_pengine_list(Request) :- 2748 http_parameters(Request, 2749 [ status(Status, [default(detached), oneof([detached])]), 2750 application(Application, [default(pengine_sandbox)]) 2751 ]), 2752 allowed(Request, Application), 2753 authenticate(Request, Application, _UserOptions), 2754 findall(Term, listed_pengine(Application, Status, Term), Terms), 2755 reply_json(json{pengines: Terms}). 2756 2757listed_pengine(Application, detached, State) :- 2758 State = pengine{id:Id, 2759 detached:Time, 2760 queued:Queued, 2761 stats:Stats}, 2762 2763 pengine_property(Id, application(Application)), 2764 pengine_property(Id, detached(Time)), 2765 pengine_queue(Id, Queue, _TimeLimit, _Now), 2766 message_queue_property(Queue, size(Queued)), 2767 ( pengine_thread(Id, Thread), 2768 catch(thread_statistics(Thread, Stats), _, fail) 2769 -> true 2770 ; Stats = thread{status:died} 2771 ).
prolog
, json
or json-s
.2780:- dynamic 2781 pengine_replying/2. % +Pengine, +Thread 2782 2783output_result(Format, Event) :- 2784 arg(1, Event, Pengine), 2785 thread_self(Thread), 2786 cors_enable, % contingent on http:cors setting 2787 disable_client_cache, 2788 setup_call_cleanup( 2789 asserta(pengine_replying(Pengine, Thread), Ref), 2790 catch(output_result(Format, Event, _{}), 2791 pengine_abort_output, 2792 true), 2793 erase(Ref)). 2794 2795output_result(Lang, Event, Dict) :- 2796 write_result(Lang, Event, Dict), 2797 !. 2798output_result(prolog, Event, _) :- 2799 !, 2800 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2801 write_term(Event, 2802 [ quoted(true), 2803 ignore_ops(true), 2804 fullstop(true), 2805 blobs(portray), 2806 portray_goal(portray_blob), 2807 nl(true) 2808 ]). 2809output_result(Lang, Event, _) :- 2810 json_lang(Lang), 2811 !, 2812 ( event_term_to_json_data(Event, JSON, Lang) 2813 -> reply_json(JSON) 2814 ; assertion(event_term_to_json_data(Event, _, Lang)) 2815 ). 2816output_result(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2817 domain_error(pengine_format, Lang).
'$BLOB'(Type)
.
Future versions may include more info, depending on Type.2827:- public portray_blob/2. % called from write-term 2828portray_blob(Blob, _Options) :- 2829 blob(Blob, Type), 2830 writeq('$BLOB'(Type)).
2837abort_pending_output(Pengine) :- 2838 forall(pengine_replying(Pengine, Thread), 2839 abort_output_thread(Thread)). 2840 2841abort_output_thread(Thread) :- 2842 catch(thread_signal(Thread, throw(pengine_abort_output)), 2843 error(existence_error(thread, _), _), 2844 true).
prolog
and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2860disable_client_cache :- 2861 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2862 Pragma: no-cache\r\n\c 2863 Expires: 0\r\n'). 2864 2865event_term_to_json_data(Event, JSON, Lang) :- 2866 event_to_json(Event, JSON, Lang), 2867 !. 2868event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2869 json{event:success, id:ID, time:Time, 2870 data:Bindings, more:More, projection:Projection}, 2871 json) :- 2872 !, 2873 term_to_json(Bindings0, Bindings). 2874event_term_to_json_data(destroy(ID, Event), 2875 json{event:destroy, id:ID, data:JSON}, 2876 Style) :- 2877 !, 2878 event_term_to_json_data(Event, JSON, Style). 2879event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2880 !, 2881 ( select(answer(First0), Features0, Features1) 2882 -> event_term_to_json_data(First0, First, Style), 2883 Features = [answer(First)|Features1] 2884 ; Features = Features0 2885 ), 2886 dict_create(JSON, json, [event(create), id(ID)|Features]). 2887event_term_to_json_data(destroy(ID, Event), 2888 json{event:destroy, id:ID, data:JSON}, Style) :- 2889 !, 2890 event_term_to_json_data(Event, JSON, Style). 2891event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2892 !, 2893 Error0 = json{event:error, id:ID, data:Message}, 2894 add_error_details(ErrorTerm, Error0, Error), 2895 message_to_string(ErrorTerm, Message). 2896event_term_to_json_data(failure(ID, Time), 2897 json{event:failure, id:ID, time:Time}, _) :- 2898 !. 2899event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2900 functor(EventTerm, F, 1), 2901 !, 2902 arg(1, EventTerm, ID). 2903event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2904 functor(EventTerm, F, 2), 2905 arg(1, EventTerm, ID), 2906 arg(2, EventTerm, Data), 2907 term_to_json(Data, JSON). 2908 2909:- public add_error_details/3.
pengines_io.pl
.
2916add_error_details(Error, JSON0, JSON) :-
2917 add_error_code(Error, JSON0, JSON1),
2918 add_error_location(Error, JSON1, JSON).
code
field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error
, type_error
, etc. Some errors carry more
information:
2931add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 2932 atom(Type), 2933 !, 2934 to_atomic(Obj, Value), 2935 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 2936add_error_code(error(Formal, _), Error0, Error) :- 2937 callable(Formal), 2938 !, 2939 functor(Formal, Code, _), 2940 Error = Error0.put(code, Code). 2941add_error_code(_, Error, Error). 2942 2943% What to do with large integers? 2944to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 2945to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 2946to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 2947to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location
property if the error can be associated with a
source location. The location is an object with properties file
and line
and, if available, the character location in the line.2956add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 2957 atom(Path), integer(Line), 2958 !, 2959 Term = Term0.put(_{location:_{file:Path, line:Line}}). 2960add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 2961 atom(Path), integer(Line), integer(Ch), 2962 !, 2963 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 2964add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More)
and output(ID,
Term)
into a format suitable for processing at the client side.2975%:- multifile pengines:event_to_json/3. 2976 2977 2978 /******************************* 2979 * ACCESS CONTROL * 2980 *******************************/
forbidden
header if contact is not allowed.2987allowed(Request, Application) :- 2988 setting(Application:allow_from, Allow), 2989 match_peer(Request, Allow), 2990 setting(Application:deny_from, Deny), 2991 \+ match_peer(Request, Deny), 2992 !. 2993allowed(Request, _Application) :- 2994 memberchk(request_uri(Here), Request), 2995 throw(http_reply(forbidden(Here))). 2996 2997match_peer(_, Allowed) :- 2998 memberchk(*, Allowed), 2999 !. 3000match_peer(_, []) :- !, fail. 3001match_peer(Request, Allowed) :- 3002 http_peer(Request, Peer), 3003 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 3004 ( memberchk(Peer, Allowed) 3005 -> true 3006 ; member(Pattern, Allowed), 3007 match_peer_pattern(Pattern, Peer) 3008 ). 3009 3010match_peer_pattern(Pattern, Peer) :- 3011 ip_term(Pattern, IP), 3012 ip_term(Peer, IP), 3013 !. 3014 3015ip_term(Peer, Pattern) :- 3016 split_string(Peer, ".", "", PartStrings), 3017 ip_pattern(PartStrings, Pattern). 3018 3019ip_pattern([], []). 3020ip_pattern([*], _) :- !. 3021ip_pattern([S|T0], [N|T]) :- 3022 number_string(N, S), 3023 ip_pattern(T0, T).
[user(User)]
, []
or
an exception.3031authenticate(Request, Application, UserOptions) :- 3032 authentication_hook(Request, Application, User), 3033 !, 3034 must_be(ground, User), 3035 UserOptions = [user(User)]. 3036authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path)))
)
Reject the request using a 403 repply.3058pengine_register_user(Options) :- 3059 option(user(User), Options), 3060 !, 3061 pengine_self(Me), 3062 asserta(pengine_user(Me, User)). 3063pengine_register_user(_).
3074pengine_user(User) :-
3075 pengine_self(Me),
3076 pengine_user(Me, User).
3082reply_options(Request, Allowed) :- 3083 option(method(options), Request), 3084 !, 3085 cors_enable(Request, 3086 [ methods(Allowed) 3087 ]), 3088 format('Content-type: text/plain\r\n'), 3089 format('~n'). % empty body 3090 3091 3092 /******************************* 3093 * COMPILE SOURCE * 3094 *******************************/
3103pengine_src_text(Src, Module) :- 3104 pengine_self(Self), 3105 format(atom(ID), 'pengine://~w/src', [Self]), 3106 extra_load_options(Self, Options), 3107 setup_call_cleanup( 3108 open_chars_stream(Src, Stream), 3109 load_files(Module:ID, 3110 [ stream(Stream), 3111 module(Module), 3112 silent(true) 3113 | Options 3114 ]), 3115 close(Stream)), 3116 keep_source(Self, ID, Src). 3117 3118system'#file'(File, _Line) :- 3119 prolog_load_context(stream, Stream), 3120 set_stream(Stream, file_name(File)), 3121 set_stream(Stream, record_position(false)), 3122 set_stream(Stream, record_position(true)).
3132pengine_src_url(URL, Module) :- 3133 pengine_self(Self), 3134 uri_encoded(path, URL, Path), 3135 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 3136 extra_load_options(Self, Options), 3137 ( get_pengine_application(Self, Application), 3138 setting(Application:debug_info, false) 3139 -> setup_call_cleanup( 3140 http_open(URL, Stream, []), 3141 ( set_stream(Stream, encoding(utf8)), 3142 load_files(Module:ID, 3143 [ stream(Stream), 3144 module(Module) 3145 | Options 3146 ]) 3147 ), 3148 close(Stream)) 3149 ; setup_call_cleanup( 3150 http_open(URL, TempStream, []), 3151 ( set_stream(TempStream, encoding(utf8)), 3152 read_string(TempStream, _, Src) 3153 ), 3154 close(TempStream)), 3155 setup_call_cleanup( 3156 open_chars_stream(Src, Stream), 3157 load_files(Module:ID, 3158 [ stream(Stream), 3159 module(Module) 3160 | Options 3161 ]), 3162 close(Stream)), 3163 keep_source(Self, ID, Src) 3164 ). 3165 3166 3167extra_load_options(Pengine, Options) :- 3168 pengine_not_sandboxed(Pengine), 3169 !, 3170 Options = []. 3171extra_load_options(_, [sandboxed(true)]). 3172 3173 3174keep_source(Pengine, ID, SrcText) :- 3175 get_pengine_application(Pengine, Application), 3176 setting(Application:debug_info, true), 3177 !, 3178 to_string(SrcText, SrcString), 3179 assertz(pengine_data(Pengine, source(ID, SrcString))). 3180keep_source(_, _, _). 3181 3182to_string(String, String) :- 3183 string(String), 3184 !. 3185to_string(Atom, String) :- 3186 atom_string(Atom, String), 3187 !. 3188 3189 /******************************* 3190 * SANDBOX * 3191 *******************************/ 3192 3193:- multifile 3194 sandbox:safe_primitive/1. 3195 3196sandbox:safe_primitive(pengines:pengine_input(_, _)). 3197sandbox:safe_primitive(pengines:pengine_output(_)). 3198sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3199 3200 3201 /******************************* 3202 * MESSAGES * 3203 *******************************/ 3204 3205prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3206 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3207 'This is normally caused by an insufficiently instantiated'-[], nl, 3208 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3209 'find all possible instantations of Var.'-[] 3210 ]
Pengines: Web Logic Programming Made Easy
The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.