1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2024, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(thread_httpd, 39 [ http_current_server/2, % ?:Goal, ?Port 40 http_server_property/2, % ?Port, ?Property 41 http_server/2, % :Goal, +Options 42 http_workers/2, % +Port, ?WorkerCount 43 http_add_worker/2, % +Port, +Options 44 http_current_worker/2, % ?Port, ?ThreadID 45 http_stop_server/2, % +Port, +Options 46 http_spawn/2, % :Goal, +Options 47 48 http_requeue/1, % +Request 49 http_close_connection/1, % +Request 50 http_enough_workers/3 % +Queue, +Why, +Peer 51 ]). 52:- use_module(library(debug)). 53:- use_module(library(error)). 54:- use_module(library(option)). 55:- use_module(library(socket)). 56:- use_module(library(thread_pool)). 57:- use_module(library(gensym)). 58:- use_module(http_wrapper). 59:- use_module(http_path). 60 61:- autoload(library(uri), [uri_resolve/3]). 62:- autoload(library(aggregate), [aggregate_all/3]). 63 64:- predicate_options(http_server/2, 2, 65 [ port(any), 66 unix_socket(atom), 67 entry_page(atom), 68 tcp_socket(any), 69 workers(positive_integer), 70 timeout(number), 71 keep_alive_timeout(number), 72 silent(boolean), 73 ssl(list(any)), % if http/http_ssl_plugin is loaded 74 pass_to(system:thread_create/3, 3) 75 ]). 76:- predicate_options(http_spawn/2, 2, 77 [ pool(atom), 78 pass_to(system:thread_create/3, 3), 79 pass_to(thread_pool:thread_create_in_pool/4, 4) 80 ]). 81:- predicate_options(http_add_worker/2, 2, 82 [ timeout(number), 83 keep_alive_timeout(number), 84 max_idle_time(number), 85 pass_to(system:thread_create/3, 3) 86 ]).
114:- meta_predicate 115 http_server( , ), 116 http_current_server( , ), 117 http_spawn( , ). 118 119:- dynamic 120 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 121 queue_worker/2, % Queue, ThreadID 122 queue_options/2. % Queue, Options 123 124:- multifile 125 make_socket_hook/3, 126 accept_hook/2, 127 close_hook/1, 128 open_client_hook/6, 129 discard_client_hook/1, 130 http:create_pool/1, 131 http:schedule_workers/1. 132 133:- meta_predicate 134 thread_repeat_wait( ).
AF_UNIX
). See socket_create/2.main
thread.
If you need to control resource usage you may consider the
spawn
option of http_handler/3 and library(thread_pool).true
(default false
), do not print an informational
message that the server was started.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)). :- use_module(library(http/http_dispatch)). start_server(Port) :- http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
200http_server(Goal, M:Options0) :- 201 server_address(Address, Options0), 202 !, 203 make_socket(Address, M:Options0, Options), 204 create_workers(Options), 205 create_server(Goal, Address, Options), 206 ( option(silent(true), Options0) 207 -> true 208 ; print_message(informational, 209 httpd_started_server(Address, Options0)) 210 ). 211http_server(_Goal, _:Options0) :- 212 existence_error(server_address, Options0). 213 214server_address(Address, Options) :- 215 ( option(port(Port), Options) 216 -> Address = Port 217 ; option(unix_socket(Path), Options) 218 -> Address = unix_socket(Path) 219 ). 220 221address_port(_IFace:Port, Port) :- !. 222address_port(unix_socket(Path), Path) :- !. 223address_port(Address, Address) :- !. 224 225tcp_address(Port) :- 226 var(Port), 227 !. 228tcp_address(Port) :- 229 integer(Port), 230 !. 231tcp_address(_Iface:_Port). 232 233address_domain(localhost:_Port, Domain) => 234 Domain = inet. 235address_domain(Iface:_Port, Domain) => 236 ( catch(ip_name(IP, Iface), error(_,_), fail), 237 functor(IP, ip, 8) 238 -> Domain = inet6 239 ; Domain = inet 240 ). 241address_domain(_, Domain) => 242 Domain = inet.
queue(QueueId)
.
253make_socket(Address, M:Options0, Options) :- 254 tcp_address(Address), 255 make_socket_hook(Address, M:Options0, Options), 256 !. 257make_socket(Address, _:Options0, Options) :- 258 option(tcp_socket(_), Options0), 259 !, 260 make_addr_atom('httpd', Address, Queue), 261 Options = [ queue(Queue) 262 | Options0 263 ]. 264make_socket(Address, _:Options0, Options) :- 265 tcp_address(Address), 266 !, 267 address_domain(Address, Domain), 268 socket_create(Socket, [domain(Domain)]), 269 tcp_setopt(Socket, reuseaddr), 270 tcp_bind(Socket, Address), 271 tcp_listen(Socket, 64), 272 make_addr_atom('httpd', Address, Queue), 273 Options = [ queue(Queue), 274 tcp_socket(Socket) 275 | Options0 276 ]. 277:- if(current_predicate(unix_domain_socket/1)). 278make_socket(Address, _:Options0, Options) :- 279 Address = unix_socket(Path), 280 !, 281 unix_domain_socket(Socket), 282 tcp_bind(Socket, Path), 283 tcp_listen(Socket, 64), 284 make_addr_atom('httpd', Address, Queue), 285 Options = [ queue(Queue), 286 tcp_socket(Socket) 287 | Options0 288 ]. 289:- endif.
296make_addr_atom(Scheme, Address, Atom) :- 297 phrase(address_parts(Address), Parts), 298 atomic_list_concat([Scheme,@|Parts], Atom). 299 300address_parts(Var) --> 301 { var(Var), 302 !, 303 instantiation_error(Var) 304 }. 305address_parts(Atomic) --> 306 { atomic(Atomic) }, 307 !, 308 [Atomic]. 309address_parts(Host:Port) --> 310 !, 311 address_parts(Host), [:], address_parts(Port). 312address_parts(ip(A,B,C,D)) --> 313 !, 314 [ A, '.', B, '.', C, '.', D ]. 315address_parts(unix_socket(Path)) --> 316 [Path]. 317address_parts(Address) --> 318 { domain_error(http_server_address, Address) }.
326create_server(Goal, Address, Options) :- 327 get_time(StartTime), 328 memberchk(queue(Queue), Options), 329 scheme(Scheme, Options), 330 autoload_https(Scheme), 331 address_port(Address, Port), 332 make_addr_atom(Scheme, Port, Alias), 333 thread_self(Initiator), 334 thread_create(accept_server(Goal, Initiator, Options), _, 335 [ alias(Alias) 336 ]), 337 thread_get_message(server_started), 338 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 339 340scheme(Scheme, Options) :- 341 option(scheme(Scheme), Options), 342 !. 343scheme(Scheme, Options) :- 344 ( option(ssl(_), Options) 345 ; option(ssl_instance(_), Options) 346 ), 347 !, 348 Scheme = https. 349scheme(http, _). 350 351autoload_https(https) :- 352 \+ clause(accept_hook(_Goal, _Options), _), 353 exists_source(library(http/http_ssl_plugin)), 354 !, 355 use_module(library(http/http_ssl_plugin)). 356autoload_https(_).
364http_current_server(Goal, Port) :-
365 current_server(Port, Goal, _, _, _, _).
http
or https
.381http_server_property(_:Port, Property) :- 382 integer(Port), 383 !, 384 server_property(Property, Port). 385http_server_property(Port, Property) :- 386 server_property(Property, Port). 387 388server_property(goal(Goal), Port) :- 389 current_server(Port, Goal, _, _, _, _). 390server_property(scheme(Scheme), Port) :- 391 current_server(Port, _, _, _, Scheme, _). 392server_property(start_time(Time), Port) :- 393 current_server(Port, _, _, _, _, Time).
406http_workers(Port, Workers) :- 407 integer(Workers), 408 !, 409 must_be(ground, Port), 410 ( current_server(Port, _, _, Queue, _, _) 411 -> resize_pool(Queue, Workers) 412 ; existence_error(http_server, Port) 413 ). 414http_workers(Port, Workers) :- 415 current_server(Port, _, _, Queue, _, _), 416 aggregate_all(count, queue_worker(Queue, _Worker), Workers).
429http_add_worker(Port, Options) :- 430 must_be(ground, Port), 431 current_server(Port, _, _, Queue, _, _), 432 !, 433 queue_options(Queue, QueueOptions), 434 merge_options(Options, QueueOptions, WorkerOptions), 435 atom_concat(Queue, '_', AliasBase), 436 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 437http_add_worker(Port, _) :- 438 existence_error(http_server, Port).
448http_current_worker(Port, ThreadID) :-
449 current_server(Port, _, _, Queue, _, _),
450 queue_worker(Queue, ThreadID).
458accept_server(Goal, Initiator, Options) :- 459 Ex = http_stop(Stopper), 460 catch(accept_server2(Goal, Initiator, Options), Ex, true), 461 thread_self(Thread), 462 debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]), 463 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)), 464 close_pending_accepts(Queue), 465 close_server_socket(Options), 466 thread_send_message(Stopper, http_stopped). 467 468accept_server2(Goal, Initiator, Options) :- 469 thread_send_message(Initiator, server_started), 470 repeat, 471 ( catch(accept_server3(Goal, Options), E, true) 472 -> ( var(E) 473 -> fail 474 ; accept_rethrow_error(E) 475 -> throw(E) 476 ; print_message(error, E), 477 fail 478 ) 479 ; print_message(error, % internal error 480 goal_failed(accept_server3(Goal, Options))), 481 fail 482 ). 483 484accept_server3(Goal, Options) :- 485 accept_hook(Goal, Options), 486 !. 487accept_server3(Goal, Options) :- 488 memberchk(tcp_socket(Socket), Options), 489 memberchk(queue(Queue), Options), 490 debug(http(connection), 'Waiting for connection', []), 491 tcp_accept(Socket, Client, Peer), 492 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)), 493 http_enough_workers(Queue, accept, Peer). 494 495send_to_worker(Queue, Client, Goal, Peer) :- 496 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 497 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 498 499accept_rethrow_error(http_stop(_)). 500accept_rethrow_error('$aborted').
507close_server_socket(Options) :- 508 close_hook(Options), 509 !. 510close_server_socket(Options) :- 511 memberchk(tcp_socket(Socket), Options), 512 !, 513 tcp_close_socket(Socket).
517close_pending_accepts(Queue) :- 518 ( thread_get_message(Queue, Msg, [timeout(0)]) 519 -> close_client(Msg), 520 close_pending_accepts(Queue) 521 ; true 522 ). 523 524close_client(tcp_client(Client, _Goal, _0Peer)) => 525 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]), 526 tcp_close_socket(Client). 527close_client(Msg) => 528 ( discard_client_hook(Msg) 529 -> true 530 ; print_message(warning, http_close_client(Msg)) 531 ).
541http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 542 ground(Host), 543 !, 544 http_stop_server(Port, Options). 545http_stop_server(Port, _Options) :- 546 http_workers(Port, 0), % checks Port is ground 547 current_server(Port, _, Thread, Queue, _Scheme, _Start), 548 retractall(queue_options(Queue, _)), 549 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]), 550 thread_self(Stopper), 551 thread_signal(Thread, throw(http_stop(Stopper))), 552 ( thread_get_message(Stopper, http_stopped, [timeout(0.1)]) 553 -> true 554 ; catch(connect(localhost:Port), _, true) 555 ), 556 thread_join(Thread, _0Status), 557 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]), 558 message_queue_destroy(Queue). 559 560connect(Address) :- 561 setup_call_cleanup( 562 tcp_socket(Socket), 563 tcp_connect(Socket, Address), 564 tcp_close_socket(Socket)).
572http_enough_workers(Queue, _Why, _Peer) :- 573 message_queue_property(Queue, waiting(_0)), 574 !, 575 debug(http(scheduler), '~D waiting for work; ok', [_0]). 576http_enough_workers(Queue, Why, Peer) :- 577 message_queue_property(Queue, size(Size)), 578 ( enough(Size, Why) 579 -> debug(http(scheduler), '~D in queue; ok', [Size]) 580 ; current_server(Port, _, _, Queue, _, _), 581 Data = _{ port:Port, 582 reason:Why, 583 peer:Peer, 584 waiting:Size, 585 queue:Queue 586 }, 587 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 588 catch(http:schedule_workers(Data), 589 Error, 590 print_message(error, Error)) 591 -> true 592 ; true 593 ). 594 595enough(0, _). 596enough(1, keep_alive). % I will be ready myself
accept
for a new connection or keep_alive
if a
worker tries to reschedule itself.
Note that, when called with reason:accept
, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
626 /******************************* 627 * WORKER QUEUE OPERATIONS * 628 *******************************/
635create_workers(Options) :- 636 option(workers(N), Options, 5), 637 option(queue(Queue), Options), 638 catch(message_queue_create(Queue), _, true), 639 atom_concat(Queue, '_', AliasBase), 640 create_workers(1, N, Queue, AliasBase, Options), 641 assert(queue_options(Queue, Options)). 642 643create_workers(I, N, _, _, _) :- 644 I > N, 645 !. 646create_workers(I, N, Queue, AliasBase, Options) :- 647 gensym(AliasBase, Alias), 648 thread_create(http_worker(Options), Id, 649 [ alias(Alias) 650 | Options 651 ]), 652 assertz(queue_worker(Queue, Id)), 653 I2 is I + 1, 654 create_workers(I2, N, Queue, AliasBase, Options).
662resize_pool(Queue, Size) :-
663 findall(W, queue_worker(Queue, W), Workers),
664 length(Workers, Now),
665 ( Now < Size
666 -> queue_options(Queue, Options),
667 atom_concat(Queue, '_', AliasBase),
668 I0 is Now+1,
669 create_workers(I0, Size, Queue, AliasBase, Options)
670 ; Now == Size
671 -> true
672 ; Now > Size
673 -> Excess is Now - Size,
674 thread_self(Me),
675 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
676 forall(between(1, Excess, _), thread_get_message(quitted(_)))
677 ).
If the message quit(Sender)
is read from the queue, the worker
stops.
688http_worker(Options) :- 689 debug(http(scheduler), 'New worker', []), 690 prolog_listen(this_thread_exit, done_worker), 691 option(queue(Queue), Options), 692 option(max_idle_time(MaxIdle), Options, infinite), 693 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 694 debug(http(worker), 'Waiting for a job ...', []), 695 debug(http(worker), 'Got job ~p', [Message]), 696 ( Message = quit(Sender) 697 -> !, 698 thread_self(Self), 699 thread_detach(Self), 700 ( Sender == idle 701 -> true 702 ; retract(queue_worker(Queue, Self)), 703 thread_send_message(Sender, quitted(Self)) 704 ) 705 ; open_client(Message, Queue, Goal, In, Out, 706 Options, ClientOptions), 707 ( catch(http_process(Goal, In, Out, ClientOptions), 708 Error, true) 709 -> true 710 ; Error = goal_failed(http_process/4) 711 ), 712 ( var(Error) 713 -> fail 714 ; current_message_level(Error, Level), 715 print_message(Level, Error), 716 memberchk(peer(Peer), ClientOptions), 717 close_connection(Peer, In, Out), 718 fail 719 ) 720 ). 721 722get_work(Queue, Message, infinite) :- 723 !, 724 thread_get_message(Queue, Message). 725get_work(Queue, Message, MaxIdle) :- 726 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 727 -> true 728 ; Message = quit(idle) 729 ).
738open_client(requeue(In, Out, Goal, ClOpts), 739 _, Goal, In, Out, Opts, ClOpts) :- 740 !, 741 memberchk(peer(Peer), ClOpts), 742 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 743 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 744open_client(Message, Queue, Goal, In, Out, Opts, 745 [ pool(client(Queue, Goal, In, Out)), 746 timeout(Timeout) 747 | Options 748 ]) :- 749 catch(open_client(Message, Goal, In, Out, Options, Opts), 750 E, report_error(E)), 751 option(timeout(Timeout), Opts, 60), 752 ( debugging(http(connection)) 753 -> memberchk(peer(Peer), Options), 754 debug(http(connection), 'Opened connection from ~p', [Peer]) 755 ; true 756 ).
762open_client(Message, Goal, In, Out, ClientOptions, Options) :- 763 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 764 !. 765open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 766 [ peer(Peer), 767 protocol(http) 768 ], _) :- 769 tcp_open_socket(Socket, In, Out). 770 771report_error(E) :- 772 print_message(error, E), 773 fail.
782check_keep_alive_connection(In, TMO, Peer, In, Out) :-
783 stream_property(In, timeout(Old)),
784 set_stream(In, timeout(TMO)),
785 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
786 catch(peek_code(In, Code), E, true),
787 ( var(E), % no exception
788 Code \== -1 % no end-of-file
789 -> set_stream(In, timeout(Old)),
790 debug(http(keep_alive), '\tre-using keep-alive connection', [])
791 ; ( Code == -1
792 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
793 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
794 ),
795 close_connection(Peer, In, Out),
796 fail
797 ).
806done_worker :- 807 thread_self(Self), 808 thread_detach(Self), 809 retract(queue_worker(Queue, Self)), 810 thread_property(Self, status(Status)), 811 !, 812 ( catch(recreate_worker(Status, Queue), _, fail) 813 -> print_message(informational, 814 httpd_restarted_worker(Self)) 815 ; done_status_message_level(Status, Level), 816 print_message(Level, 817 httpd_stopped_worker(Self, Status)) 818 ). 819done_worker :- % received quit(Sender) 820 thread_self(Self), 821 thread_property(Self, status(Status)), 822 done_status_message_level(Status, Level), 823 print_message(Level, 824 httpd_stopped_worker(Self, Status)). 825 826done_status_message_level(true, silent) :- !. 827done_status_message_level(exception('$aborted'), silent) :- !. 828done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error
. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
843recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 844 halt(2). 845recreate_worker(exception(Error), Queue) :- 846 recreate_on_error(Error), 847 queue_options(Queue, Options), 848 atom_concat(Queue, '_', AliasBase), 849 create_workers(1, 1, Queue, AliasBase, Options). 850 851recreate_on_error('$aborted'). 852recreate_on_error(time_limit_exceeded).
861:- multifile 862 message_level/2. 863 864message_level(error(io_error(read, _), _), silent). 865message_level(error(socket_error(epipe,_), _), silent). 866message_level(error(http_write_short(_Obj,_Written), _), silent). 867message_level(error(timeout_error(read, _), _), informational). 868message_level(keep_alive_timeout, silent). 869 870current_message_level(Term, Level) :- 871 ( message_level(Term, Level) 872 -> true 873 ; Level = error 874 ).
882http_requeue(Header) :- 883 requeue_header(Header, ClientOptions), 884 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 885 memberchk(peer(Peer), ClientOptions), 886 http_enough_workers(Queue, keep_alive, Peer), 887 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 888 !. 889http_requeue(Header) :- 890 debug(http(error), 'Re-queue failed: ~p', [Header]), 891 fail. 892 893requeue_header([], []). 894requeue_header([H|T0], [H|T]) :- 895 requeue_keep(H), 896 !, 897 requeue_header(T0, T). 898requeue_header([_|T0], T) :- 899 requeue_header(T0, T). 900 901requeue_keep(pool(_)). 902requeue_keep(peer(_)). 903requeue_keep(protocol(_)).
910http_process(Goal, In, Out, Options) :- 911 debug(http(server), 'Running server goal ~p on ~p -> ~p', 912 [Goal, In, Out]), 913 option(timeout(TMO), Options, 60), 914 set_stream(In, timeout(TMO)), 915 set_stream(Out, timeout(TMO)), 916 http_wrapper(Goal, In, Out, Connection, 917 [ request(Request) 918 | Options 919 ]), 920 next(Connection, Request). 921 922next(Connection, Request) :- 923 next_(Connection, Request), !. 924next(Connection, Request) :- 925 print_message(warning, goal_failed(next(Connection,Request))). 926 927next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :- 928 !, 929 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 930 ( catch(call(SwitchGoal, In, Out), E, 931 ( print_message(error, E), 932 fail)) 933 -> true 934 ; http_close_connection(Request) 935 ). 936next_(spawned(ThreadId), _) :- 937 !, 938 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 939next_(Connection, Request) :- 940 downcase_atom(Connection, 'keep-alive'), 941 http_requeue(Request), 942 !. 943next_(_, Request) :- 944 http_close_connection(Request).
951http_close_connection(Request) :-
952 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
953 memberchk(peer(Peer), Request),
954 close_connection(Peer, In, Out).
961close_connection(Peer, In, Out) :-
962 debug(http(connection), 'Closing connection from ~p', [Peer]),
963 catch(close(In, [force(true)]), _, true),
964 catch(close(Out, [force(true)]), _, true).
If a pool does not exist, this predicate calls the multifile hook create_pool/1 to create it. If this predicate succeeds the operation is retried.
982http_spawn(Goal, Options) :- 983 select_option(pool(Pool), Options, ThreadOptions), 984 !, 985 current_output(CGI), 986 catch(thread_create_in_pool(Pool, 987 wrap_spawned(CGI, Goal), Id, 988 [ detached(true) 989 | ThreadOptions 990 ]), 991 Error, 992 true), 993 ( var(Error) 994 -> http_spawned(Id) 995 ; Error = error(resource_error(threads_in_pool(_)), _) 996 -> throw(http_reply(busy)) 997 ; Error = error(existence_error(thread_pool, Pool), _), 998 create_pool(Pool) 999 -> http_spawn(Goal, Options) 1000 ; throw(Error) 1001 ). 1002http_spawn(Goal, Options) :- 1003 current_output(CGI), 1004 thread_create(wrap_spawned(CGI, Goal), Id, 1005 [ detached(true) 1006 | Options 1007 ]), 1008 http_spawned(Id). 1009 1010wrap_spawned(CGI, Goal) :- 1011 set_output(CGI), 1012 http_wrap_spawned(Goal, Request, Connection), 1013 next(Connection, Request).
1023create_pool(Pool) :- 1024 E = error(permission_error(create, thread_pool, Pool), _), 1025 catch(http:create_pool(Pool), E, true). 1026create_pool(Pool) :- 1027 print_message(informational, httpd(created_pool(Pool))), 1028 thread_pool_create(Pool, 10, []). 1029 1030 1031 /******************************* 1032 * WAIT POLICIES * 1033 *******************************/ 1034 1035:- meta_predicate 1036 thread_repeat_wait( ).
repeat, thread_idle(Goal)
, choosing whether to use a
long
or short
idle time based on the average firing rate.1043thread_repeat_wait(Goal) :- 1044 new_rate_mma(5, 1000, State), 1045 repeat, 1046 update_rate_mma(State, MMA), 1047 long(MMA, IsLong), 1048 ( IsLong == brief 1049 -> call(Goal) 1050 ; thread_idle(Goal, IsLong) 1051 ). 1052 1053long(MMA, brief) :- 1054 MMA < 0.05, 1055 !. 1056long(MMA, short) :- 1057 MMA < 1, 1058 !. 1059long(_, long).
1073new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 1074 current_prolog_flag(max_tagged_integer, MaxI), 1075 get_time(Base). 1076 1077update_rate_mma(State, MMAr) :- 1078 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 1079 get_time(Now), 1080 Stamp is round((Now-Base)*Resolution), 1081 ( Stamp > MaxI 1082 -> nb_setarg(1, State, Now), 1083 nb_setarg(2, State, 0) 1084 ; true 1085 ), 1086 Diff is Stamp-Last, 1087 nb_setarg(2, State, Stamp), 1088 MMA is round(((N-1)*MMA0+Diff)/N), 1089 nb_setarg(6, State, MMA), 1090 MMAr is MMA/float(Resolution). 1091 1092 1093 /******************************* 1094 * MESSAGES * 1095 *******************************/ 1096 1097:- multifile 1098 prolog:message/3. 1099 1100prologmessage(httpd_started_server(Port, Options)) --> 1101 [ 'Started server at '-[] ], 1102 http_root(Port, Options). 1103prologmessage(httpd_stopped_worker(Self, Status)) --> 1104 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1105prologmessage(httpd_restarted_worker(Self)) --> 1106 [ 'Replaced aborted worker ~p'-[Self] ]. 1107prologmessage(httpd(created_pool(Pool))) --> 1108 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1109 'Create this pool at startup-time or define the hook ', nl, 1110 'http:create_pool/1 to avoid this message and create a ', nl, 1111 'pool that fits the usage-profile.' 1112 ]. 1113 1114http_root(Address, Options) --> 1115 { landing_page(Address, URI, Options) }, 1116 [ '~w'-[URI] ]. 1117 1118landing_page(Host:Port, URI, Options) :- 1119 !, 1120 must_be(atom, Host), 1121 must_be(integer, Port), 1122 http_server_property(Port, scheme(Scheme)), 1123 ( default_port(Scheme, Port) 1124 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1125 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1126 ), 1127 entry_page(Base, URI, Options). 1128landing_page(unix_socket(Path), URI, _Options) :- 1129 !, 1130 format(string(URI), 'Unix domain socket "~w"', [Path]). 1131landing_page(Port, URI, Options) :- 1132 landing_page(localhost:Port, URI, Options). 1133 1134default_port(http, 80). 1135default_port(https, 443). 1136 1137entry_page(Base, URI, Options) :- 1138 option(entry_page(Entry), Options), 1139 !, 1140 uri_resolve(Entry, Base, URI). 1141entry_page(Base, URI, _) :- 1142 http_absolute_location(root(.), Entry, []), 1143 uri_resolve(Entry, Base, URI)
Threaded HTTP server
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.
On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.
Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */