1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2024, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(thread_httpd, 39 [ http_current_server/2, % ?:Goal, ?Port 40 http_server_property/2, % ?Port, ?Property 41 http_server/2, % :Goal, +Options 42 http_workers/2, % +Port, ?WorkerCount 43 http_add_worker/2, % +Port, +Options 44 http_current_worker/2, % ?Port, ?ThreadID 45 http_stop_server/2, % +Port, +Options 46 http_spawn/2, % :Goal, +Options 47 48 http_requeue/1, % +Request 49 http_close_connection/1, % +Request 50 http_enough_workers/3 % +Queue, +Why, +Peer 51 ]). 52:- use_module(library(debug)). 53:- use_module(library(error)). 54:- use_module(library(option)). 55:- use_module(library(socket)). 56:- use_module(library(thread_pool)). 57:- use_module(library(gensym)). 58:- use_module(http_wrapper). 59:- use_module(http_path). 60:- use_module(http_stream). 61 62:- autoload(library(uri), [uri_resolve/3]). 63:- autoload(library(aggregate), [aggregate_all/3]). 64 65:- predicate_options(http_server/2, 2, 66 [ port(any), 67 unix_socket(atom), 68 entry_page(atom), 69 tcp_socket(any), 70 workers(positive_integer), 71 timeout(number), 72 keep_alive_timeout(number), 73 silent(boolean), 74 ssl(list(any)), % if http/http_ssl_plugin is loaded 75 pass_to(system:thread_create/3, 3) 76 ]). 77:- predicate_options(http_spawn/2, 2, 78 [ pool(atom), 79 pass_to(system:thread_create/3, 3), 80 pass_to(thread_pool:thread_create_in_pool/4, 4) 81 ]). 82:- predicate_options(http_add_worker/2, 2, 83 [ timeout(number), 84 keep_alive_timeout(number), 85 max_idle_time(number), 86 pass_to(system:thread_create/3, 3) 87 ]).
115:- meta_predicate 116 http_server(, ), 117 http_current_server(, ), 118 http_spawn(, ). 119 120:- dynamic 121 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 122 queue_worker/2, % Queue, ThreadID 123 queue_options/2. % Queue, Options 124 125:- multifile 126 make_socket_hook/3, 127 accept_hook/2, 128 close_hook/1, 129 open_client_hook/6, 130 discard_client_hook/1, 131 http:create_pool/1, 132 http:schedule_workers/1. 133 134:- meta_predicate 135 thread_repeat_wait().
AF_UNIX). See socket_create/2.main thread.
If you need to control resource usage you may consider the
spawn option of http_handler/3 and library(thread_pool).true (default false), do not print an informational
message that the server was started.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)).
:- use_module(library(http/http_dispatch)).
start_server(Port) :-
http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
201http_server(Goal, M:Options0) :- 202 server_address(Address, Options0), 203 !, 204 make_socket(Address, M:Options0, Options), 205 create_workers(Options), 206 create_server(Goal, Address, Options), 207 ( option(silent(true), Options0) 208 -> true 209 ; print_message(informational, 210 httpd_started_server(Address, Options0)) 211 ). 212http_server(_Goal, _:Options0) :- 213 existence_error(server_address, Options0). 214 215server_address(Address, Options) :- 216 ( option(port(Port), Options) 217 -> Address = Port 218 ; option(unix_socket(Path), Options) 219 -> Address = unix_socket(Path) 220 ). 221 222address_port(_IFace:Port, Port) :- !. 223address_port(unix_socket(Path), Path) :- !. 224address_port(Address, Address) :- !. 225 226tcp_address(Port) :- 227 var(Port), 228 !. 229tcp_address(Port) :- 230 integer(Port), 231 !. 232tcp_address(_Iface:_Port). 233 234address_domain(localhost:_Port, Domain) => 235 Domain = inet. 236address_domain(Iface:_Port, Domain) => 237 ( catch(ip_name(IP, Iface), error(_,_), fail), 238 functor(IP, ip, 8) 239 -> Domain = inet6 240 ; Domain = inet 241 ). 242address_domain(_, Domain) => 243 Domain = inet.
queue(QueueId).
254make_socket(Address, M:Options0, Options) :- 255 tcp_address(Address), 256 make_socket_hook(Address, M:Options0, Options), 257 !. 258make_socket(Address, _:Options0, Options) :- 259 option(tcp_socket(_), Options0), 260 !, 261 make_addr_atom('httpd', Address, Queue), 262 Options = [ queue(Queue) 263 | Options0 264 ]. 265make_socket(Address, _:Options0, Options) :- 266 tcp_address(Address), 267 !, 268 address_domain(Address, Domain), 269 socket_create(Socket, [domain(Domain)]), 270 tcp_setopt(Socket, reuseaddr), 271 tcp_bind(Socket, Address), 272 tcp_listen(Socket, 64), 273 make_addr_atom('httpd', Address, Queue), 274 Options = [ queue(Queue), 275 tcp_socket(Socket) 276 | Options0 277 ]. 278:- if(current_predicate(unix_domain_socket/1)). 279make_socket(Address, _:Options0, Options) :- 280 Address = unix_socket(Path), 281 !, 282 unix_domain_socket(Socket), 283 tcp_bind(Socket, Path), 284 tcp_listen(Socket, 64), 285 make_addr_atom('httpd', Address, Queue), 286 Options = [ queue(Queue), 287 tcp_socket(Socket) 288 | Options0 289 ]. 290:- endif.
297make_addr_atom(Scheme, Address, Atom) :- 298 phrase(address_parts(Address), Parts), 299 atomic_list_concat([Scheme,@|Parts], Atom). 300 301address_parts(Var) --> 302 { var(Var), 303 !, 304 instantiation_error(Var) 305 }. 306address_parts(Atomic) --> 307 { atomic(Atomic) }, 308 !, 309 [Atomic]. 310address_parts(Host:Port) --> 311 !, 312 address_parts(Host), [:], address_parts(Port). 313address_parts(ip(A,B,C,D)) --> 314 !, 315 [ A, '.', B, '.', C, '.', D ]. 316address_parts(unix_socket(Path)) --> 317 [Path]. 318address_parts(Address) --> 319 { domain_error(http_server_address, Address) }.
327create_server(Goal, Address, Options) :- 328 get_time(StartTime), 329 memberchk(queue(Queue), Options), 330 scheme(Scheme, Options), 331 autoload_https(Scheme), 332 address_port(Address, Port), 333 make_addr_atom(Scheme, Port, Alias), 334 thread_self(Initiator), 335 thread_create(accept_server(Goal, Initiator, Options), _, 336 [ alias(Alias), 337 class(service) 338 ]), 339 thread_get_message(server_started), 340 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 341 342scheme(Scheme, Options) :- 343 option(scheme(Scheme), Options), 344 !. 345scheme(Scheme, Options) :- 346 ( option(ssl(_), Options) 347 ; option(ssl_instance(_), Options) 348 ), 349 !, 350 Scheme = https. 351scheme(http, _). 352 353autoload_https(https) :- 354 \+ clause(accept_hook(_Goal, _Options), _), 355 exists_source(library(http/http_ssl_plugin)), 356 !, 357 use_module(library(http/http_ssl_plugin)). 358autoload_https(_).
366http_current_server(Goal, Port) :-
367 current_server(Port, Goal, _, _, _, _).http or https.383http_server_property(_:Port, Property) :- 384 integer(Port), 385 !, 386 server_property(Property, Port). 387http_server_property(Port, Property) :- 388 server_property(Property, Port). 389 390server_property(goal(Goal), Port) :- 391 current_server(Port, Goal, _, _, _, _). 392server_property(scheme(Scheme), Port) :- 393 current_server(Port, _, _, _, Scheme, _). 394server_property(start_time(Time), Port) :- 395 current_server(Port, _, _, _, _, Time).
408http_workers(Port, Workers) :- 409 integer(Workers), 410 !, 411 must_be(ground, Port), 412 ( current_server(Port, _, _, Queue, _, _) 413 -> resize_pool(Queue, Workers) 414 ; existence_error(http_server, Port) 415 ). 416http_workers(Port, Workers) :- 417 current_server(Port, _, _, Queue, _, _), 418 aggregate_all(count, queue_worker(Queue, _Worker), Workers).
431http_add_worker(Port, Options) :- 432 must_be(ground, Port), 433 current_server(Port, _, _, Queue, _, _), 434 !, 435 queue_options(Queue, QueueOptions), 436 merge_options(Options, QueueOptions, WorkerOptions), 437 atom_concat(Queue, '_', AliasBase), 438 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 439http_add_worker(Port, _) :- 440 existence_error(http_server, Port).
450http_current_worker(Port, ThreadID) :-
451 current_server(Port, _, _, Queue, _, _),
452 queue_worker(Queue, ThreadID).460accept_server(Goal, Initiator, Options) :- 461 Ex = http_stop(Stopper), 462 catch(accept_server2(Goal, Initiator, Options), Ex, true), 463 thread_self(Thread), 464 debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]), 465 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)), 466 close_pending_accepts(Queue), 467 close_server_socket(Options), 468 thread_send_message(Stopper, http_stopped). 469 470accept_server2(Goal, Initiator, Options) :- 471 thread_send_message(Initiator, server_started), 472 repeat, 473 ( catch(accept_server3(Goal, Options), E, true) 474 -> ( var(E) 475 -> fail 476 ; accept_rethrow_error(E) 477 -> throw(E) 478 ; print_message(error, E), 479 fail 480 ) 481 ; print_message(error, % internal error 482 goal_failed(accept_server3(Goal, Options))), 483 fail 484 ). 485 486accept_server3(Goal, Options) :- 487 accept_hook(Goal, Options), 488 !. 489accept_server3(Goal, Options) :- 490 memberchk(tcp_socket(Socket), Options), 491 memberchk(queue(Queue), Options), 492 debug(http(connection), 'Waiting for connection', []), 493 tcp_accept(Socket, Client, Peer), 494 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)), 495 http_enough_workers(Queue, accept, Peer). 496 497send_to_worker(Queue, Client, Goal, Peer) :- 498 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 499 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 500 501accept_rethrow_error(http_stop(_)).
507close_server_socket(Options) :- 508 close_hook(Options), 509 !. 510close_server_socket(Options) :- 511 memberchk(tcp_socket(Socket), Options), 512 !, 513 tcp_close_socket(Socket).
517close_pending_accepts(Queue) :- 518 ( thread_get_message(Queue, Msg, [timeout(0)]) 519 -> close_client(Msg), 520 close_pending_accepts(Queue) 521 ; true 522 ). 523 524close_client(tcp_client(Client, _Goal, _0Peer)) => 525 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]), 526 tcp_close_socket(Client). 527close_client(Msg) => 528 ( discard_client_hook(Msg) 529 -> true 530 ; print_message(warning, http_close_client(Msg)) 531 ).
541http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 542 ground(Host), 543 !, 544 http_stop_server(Port, Options). 545http_stop_server(Port, _Options) :- 546 http_workers(Port, 0), % checks Port is ground 547 current_server(Port, _, Thread, Queue, _Scheme, _Start), 548 retractall(queue_options(Queue, _)), 549 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]), 550 thread_self(Stopper), 551 thread_signal(Thread, throw(http_stop(Stopper))), 552 ( thread_get_message(Stopper, http_stopped, [timeout(0.1)]) 553 -> true 554 ; catch(connect(localhost:Port), _, true) 555 ), 556 thread_join(Thread, _0Status), 557 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]), 558 message_queue_destroy(Queue). 559 560connect(Address) :- 561 setup_call_cleanup( 562 tcp_socket(Socket), 563 tcp_connect(Socket, Address), 564 tcp_close_socket(Socket)).
572http_enough_workers(Queue, _Why, _Peer) :- 573 message_queue_property(Queue, waiting(_0)), 574 !, 575 debug(http(scheduler), '~D waiting for work; ok', [_0]). 576http_enough_workers(Queue, Why, Peer) :- 577 message_queue_property(Queue, size(Size)), 578 ( enough(Size, Why) 579 -> debug(http(scheduler), '~D in queue; ok', [Size]) 580 ; current_server(Port, _, _, Queue, _, _), 581 Data = _{ port:Port, 582 reason:Why, 583 peer:Peer, 584 waiting:Size, 585 queue:Queue 586 }, 587 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 588 catch(http:schedule_workers(Data), 589 Error, 590 print_message(error, Error)) 591 -> true 592 ; true 593 ). 594 595enough(0, _). 596enough(1, keep_alive). % I will be ready myself
accept for a new connection or keep_alive if a
worker tries to reschedule itself.
Note that, when called with reason:accept, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
626 /******************************* 627 * WORKER QUEUE OPERATIONS * 628 *******************************/
635create_workers(Options) :- 636 option(workers(N), Options, 5), 637 option(queue(Queue), Options), 638 catch(message_queue_create(Queue), _, true), 639 atom_concat(Queue, '_', AliasBase), 640 create_workers(1, N, Queue, AliasBase, Options), 641 assert(queue_options(Queue, Options)). 642 643create_workers(I, N, _, _, _) :- 644 I > N, 645 !. 646create_workers(I, N, Queue, AliasBase, Options) :- 647 gensym(AliasBase, Alias), 648 thread_create(http_worker(Options), Id, 649 [ alias(Alias), 650 class(http) 651 | Options 652 ]), 653 assertz(queue_worker(Queue, Id)), 654 I2 is I + 1, 655 create_workers(I2, N, Queue, AliasBase, Options).
663resize_pool(Queue, Size) :-
664 findall(W, queue_worker(Queue, W), Workers),
665 length(Workers, Now),
666 ( Now < Size
667 -> queue_options(Queue, Options),
668 atom_concat(Queue, '_', AliasBase),
669 I0 is Now+1,
670 create_workers(I0, Size, Queue, AliasBase, Options)
671 ; Now == Size
672 -> true
673 ; Now > Size
674 -> Excess is Now - Size,
675 thread_self(Me),
676 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
677 forall(between(1, Excess, _), thread_get_message(quitted(_)))
678 ).
If the message quit(Sender) is read from the queue, the worker
stops.
689http_worker(Options) :- 690 debug(http(scheduler), 'New worker', []), 691 prolog_listen(this_thread_exit, done_worker), 692 option(queue(Queue), Options), 693 option(max_idle_time(MaxIdle), Options, infinite), 694 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 695 debug(http(worker), 'Waiting for a job ...', []), 696 debug(http(worker), 'Got job ~p', [Message]), 697 ( Message = quit(Sender) 698 -> !, 699 thread_self(Self), 700 thread_detach(Self), 701 ( Sender == idle 702 -> true 703 ; retract(queue_worker(Queue, Self)), 704 thread_send_message(Sender, quitted(Self)) 705 ) 706 ; open_client(Message, Queue, Goal, In, Out, 707 Options, ClientOptions), 708 ( catch(http_process(Goal, In, Out, ClientOptions), 709 Error, true) 710 -> true 711 ; Error = goal_failed(http_process/4) 712 ), 713 debug_reset_from_class, % Restore debug mode after user nodebug 714 ( var(Error) 715 -> fail 716 ; current_message_level(Error, Level), 717 print_message(Level, Error), 718 memberchk(peer(Peer), ClientOptions), 719 close_connection(Peer, In, Out), 720 fail 721 ) 722 ). 723 724get_work(Queue, Message, infinite) :- 725 !, 726 thread_get_message(Queue, Message). 727get_work(Queue, Message, MaxIdle) :- 728 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 729 -> true 730 ; Message = quit(idle) 731 ).
740open_client(requeue(In, Out, Goal, ClOpts), 741 _, Goal, In, Out, Opts, ClOpts) :- 742 !, 743 memberchk(peer(Peer), ClOpts), 744 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 745 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 746open_client(Message, Queue, Goal, In, Out, Opts, 747 [ pool(client(Queue, Goal, In, Out)), 748 timeout(Timeout) 749 | Options 750 ]) :- 751 catch(open_client(Message, Goal, In, Out, Options, Opts), 752 E, report_error(E)), 753 option(timeout(Timeout), Opts, 60), 754 ( debugging(http(connection)) 755 -> memberchk(peer(Peer), Options), 756 debug(http(connection), 'Opened connection from ~p', [Peer]) 757 ; true 758 ).
764open_client(Message, Goal, In, Out, ClientOptions, Options) :- 765 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 766 !. 767open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 768 [ peer(Peer), 769 protocol(http) 770 ], _) :- 771 tcp_open_socket(Socket, In, Out). 772 773report_error(E) :- 774 print_message(error, E), 775 fail.
784check_keep_alive_connection(In, TMO, Peer, In, Out) :-
785 stream_property(In, timeout(Old)),
786 set_stream(In, timeout(TMO)),
787 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
788 catch(peek_code(In, Code), E, true),
789 ( var(E), % no exception
790 Code \== -1 % no end-of-file
791 -> set_stream(In, timeout(Old)),
792 debug(http(keep_alive), '\tre-using keep-alive connection', [])
793 ; ( Code == -1
794 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
795 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
796 ),
797 close_connection(Peer, In, Out),
798 fail
799 ).808done_worker :- 809 thread_self(Self), 810 thread_detach(Self), 811 retract(queue_worker(Queue, Self)), 812 thread_property(Self, status(Status)), 813 !, 814 ( catch(recreate_worker(Status, Queue), _, fail) 815 -> print_message(informational, 816 httpd_restarted_worker(Self)) 817 ; done_status_message_level(Status, Level), 818 print_message(Level, 819 httpd_stopped_worker(Self, Status)) 820 ). 821done_worker :- % received quit(Sender) 822 thread_self(Self), 823 thread_property(Self, status(Status)), 824 done_status_message_level(Status, Level), 825 print_message(Level, 826 httpd_stopped_worker(Self, Status)). 827 828done_status_message_level(true, silent) :- !. 829done_status_message_level(exception('$aborted'), silent) :- !. 830done_status_message_level(exception(unwind(abort)), silent) :- !. 831done_status_message_level(exception(unwind(halt(_))), silent) :- !. 832done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
847recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 848 halt(2). 849recreate_worker(exception(Error), Queue) :- 850 recreate_on_error(Error), 851 queue_options(Queue, Options), 852 atom_concat(Queue, '_', AliasBase), 853 create_workers(1, 1, Queue, AliasBase, Options). 854 855recreate_on_error('$aborted'). 856recreate_on_error(unwind(abort)). 857recreate_on_error(time_limit_exceeded).
866:- multifile 867 message_level/2. 868 869message_level(error(io_error(read, _), _), silent). 870message_level(error(socket_error(epipe,_), _), silent). 871message_level(error(http_write_short(_Obj,_Written), _), silent). 872message_level(error(timeout_error(read, _), _), informational). 873message_level(keep_alive_timeout, silent). 874 875current_message_level(Term, Level) :- 876 ( message_level(Term, Level) 877 -> true 878 ; Level = error 879 ).
886read_remaining_request(StartBody, Request) :- 887 memberchk(content_length(Len), Request), 888 !, 889 memberchk(pool(client(_Queue, _Goal, In, _Out)), Request), 890 byte_count(In, Here), 891 Left is StartBody+Len-Here, 892 read_incomplete(In, Left). 893read_remaining_request(_, _Request). 894 895read_incomplete(_, 0) :- 896 !. 897read_incomplete(In, Left) :- 898 % Left < 1 000 000, % Optionally close anyway. 899 catch(setup_call_cleanup( 900 open_null_stream(Null), 901 copy_stream_data(In, Null, Left), 902 close(Null)), 903 error(_,_), 904 fail).
911http_requeue(Header) :- 912 requeue_header(Header, ClientOptions), 913 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 914 memberchk(peer(Peer), ClientOptions), 915 http_enough_workers(Queue, keep_alive, Peer), 916 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 917 !. 918http_requeue(Header) :- 919 debug(http(error), 'Re-queue failed: ~p', [Header]), 920 fail. 921 922requeue_header([], []). 923requeue_header([H|T0], [H|T]) :- 924 requeue_keep(H), 925 !, 926 requeue_header(T0, T). 927requeue_header([_|T0], T) :- 928 requeue_header(T0, T). 929 930requeue_keep(pool(_)). 931requeue_keep(peer(_)). 932requeue_keep(protocol(_)).
939http_process(Goal, In, Out, Options) :- 940 debug(http(server), 'Running server goal ~p on ~p -> ~p', 941 [Goal, In, Out]), 942 option(timeout(TMO), Options, 60), 943 set_stream(In, timeout(TMO)), 944 set_stream(Out, timeout(TMO)), 945 http_wrapper(Goal, In, Out, Connection, 946 [ request(Request), 947 byte_count(StartBody) 948 | Options 949 ]), 950 next(Connection, StartBody, Request). 951 952next(Connection, StartBody, Request) :- 953 next_(Connection, StartBody, Request), !. 954next(Connection, StartBody, Request) :- 955 print_message(warning, goal_failed(next(Connection,StartBody,Request))). 956 957next_(switch_protocol(SwitchGoal, _SwitchOptions), _, Request) :- 958 !, 959 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 960 ( catch(call(SwitchGoal, In, Out), E, 961 ( print_message(error, E), 962 fail)) 963 -> true 964 ; http_close_connection(Request) 965 ). 966next_(spawned(ThreadId), _, _) :- 967 !, 968 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 969next_(Connection, StartBody, Request) :- 970 downcase_atom(Connection, 'keep-alive'), 971 read_remaining_request(StartBody, Request), 972 http_requeue(Request), 973 !. 974next_(_, _, Request) :- 975 http_close_connection(Request).
982http_close_connection(Request) :-
983 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
984 memberchk(peer(Peer), Request),
985 close_connection(Peer, In, Out).
992close_connection(Peer, In, Out) :-
993 debug(http(connection), 'Closing connection from ~p', [Peer]),
994 catch(close(In, [force(true)]), _, true),
995 catch(close(Out, [force(true)]), _, true).If a pool does not exist, this predicate calls the multifile hook create_pool/1 to create it. If this predicate succeeds the operation is retried.
1013http_spawn(Goal, Options) :- 1014 select_option(pool(Pool), Options, ThreadOptions), 1015 !, 1016 current_output(CGI), 1017 Error = error(Formal, _), 1018 catch(thread_create_in_pool(Pool, 1019 wrap_spawned(CGI, Goal), Id, 1020 [ detached(true), 1021 class(http) 1022 | ThreadOptions 1023 ]), 1024 Error, 1025 true), 1026 ( var(Formal) 1027 -> http_spawned(Id) 1028 ; Formal = resource_error(threads_in_pool(_)) 1029 -> throw(http_reply(busy)) 1030 ; Formal = existence_error(thread_pool, Pool), 1031 create_pool(Pool) 1032 -> http_spawn(Goal, Options) 1033 ; throw(Error) 1034 ). 1035http_spawn(Goal, Options) :- 1036 current_output(CGI), 1037 thread_create(wrap_spawned(CGI, Goal), Id, 1038 [ detached(true), 1039 class(http) 1040 | Options 1041 ]), 1042 http_spawned(Id). 1043 1044wrap_spawned(CGI, Goal) :- 1045 set_output(CGI), 1046 cgi_property(CGI, request(Request)), 1047 memberchk(input(Input), Request), 1048 byte_count(Input, StartBody), 1049 http_wrap_spawned(Goal, Request, Connection), 1050 next(Connection, StartBody, Request).
1060create_pool(Pool) :- 1061 E = error(permission_error(create, thread_pool, Pool), _), 1062 catch(http:create_pool(Pool), E, true). 1063create_pool(Pool) :- 1064 print_message(informational, httpd(created_pool(Pool))), 1065 thread_pool_create(Pool, 10, []). 1066 1067 1068 /******************************* 1069 * WAIT POLICIES * 1070 *******************************/ 1071 1072:- meta_predicate 1073 thread_repeat_wait().
repeat, thread_idle(Goal), choosing whether to use a
long or short idle time based on the average firing rate.1080thread_repeat_wait(Goal) :- 1081 new_rate_mma(5, 1000, State), 1082 repeat, 1083 notrace, 1084 update_rate_mma(State, MMA), 1085 long(MMA, IsLong), 1086 ( IsLong == brief 1087 -> call(Goal) 1088 ; thread_idle(Goal, IsLong) 1089 ). 1090 1091long(MMA, brief) :- 1092 MMA < 0.05, 1093 !. 1094long(MMA, short) :- 1095 MMA < 1, 1096 !. 1097long(_, long).
1111new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 1112 current_prolog_flag(max_tagged_integer, MaxI), 1113 get_time(Base). 1114 1115update_rate_mma(State, MMAr) :- 1116 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 1117 get_time(Now), 1118 Stamp is round((Now-Base)*Resolution), 1119 ( Stamp > MaxI 1120 -> nb_setarg(1, State, Now), 1121 nb_setarg(2, State, 0) 1122 ; true 1123 ), 1124 Diff is Stamp-Last, 1125 nb_setarg(2, State, Stamp), 1126 MMA is round(((N-1)*MMA0+Diff)/N), 1127 nb_setarg(6, State, MMA), 1128 MMAr is MMA/float(Resolution). 1129 1130 1131 /******************************* 1132 * MESSAGES * 1133 *******************************/ 1134 1135:- multifile 1136 prolog:message/3. 1137 1138prologmessage(httpd_started_server(Port, Options)) --> 1139 [ 'Started server at '-[] ], 1140 http_root(Port, Options). 1141prologmessage(httpd_stopped_worker(Self, Status)) --> 1142 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1143prologmessage(httpd_restarted_worker(Self)) --> 1144 [ 'Replaced aborted worker ~p'-[Self] ]. 1145prologmessage(httpd(created_pool(Pool))) --> 1146 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1147 'Create this pool at startup-time or define the hook ', nl, 1148 'http:create_pool/1 to avoid this message and create a ', nl, 1149 'pool that fits the usage-profile.' 1150 ]. 1151 1152http_root(Address, Options) --> 1153 { landing_page(Address, URI, Options) }, 1154 [ url(URI) ]. 1155 1156landing_page(Host:Port, URI, Options) :- 1157 !, 1158 must_be(atom, Host), 1159 must_be(integer, Port), 1160 http_server_property(Port, scheme(Scheme)), 1161 ( default_port(Scheme, Port) 1162 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1163 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1164 ), 1165 entry_page(Base, URI, Options). 1166landing_page(unix_socket(Path), URI, _Options) :- 1167 !, 1168 format(string(URI), 'Unix domain socket "~w"', [Path]). 1169landing_page(Port, URI, Options) :- 1170 landing_page(localhost:Port, URI, Options). 1171 1172default_port(http, 80). 1173default_port(https, 443). 1174 1175entry_page(Base, URI, Options) :- 1176 option(entry_page(Entry), Options), 1177 !, 1178 uri_resolve(Entry, Base, URI). 1179entry_page(Base, URI, _) :- 1180 http_absolute_location(root(.), Entry, []), 1181 uri_resolve(Entry, Base, URI)
Threaded HTTP server
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.
On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.
Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */