1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2024, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(thread_httpd, 39 [ http_current_server/2, % ?:Goal, ?Port 40 http_server_property/2, % ?Port, ?Property 41 http_server/2, % :Goal, +Options 42 http_workers/2, % +Port, ?WorkerCount 43 http_add_worker/2, % +Port, +Options 44 http_current_worker/2, % ?Port, ?ThreadID 45 http_stop_server/2, % +Port, +Options 46 http_spawn/2, % :Goal, +Options 47 48 http_requeue/1, % +Request 49 http_close_connection/1, % +Request 50 http_enough_workers/3 % +Queue, +Why, +Peer 51 ]). 52:- use_module(library(debug)). 53:- use_module(library(error)). 54:- use_module(library(option)). 55:- use_module(library(socket)). 56:- use_module(library(thread_pool)). 57:- use_module(library(gensym)). 58:- use_module(http_wrapper). 59:- use_module(http_path). 60 61:- autoload(library(uri), [uri_resolve/3]). 62:- autoload(library(aggregate), [aggregate_all/3]). 63 64:- predicate_options(http_server/2, 2, 65 [ port(any), 66 unix_socket(atom), 67 entry_page(atom), 68 tcp_socket(any), 69 workers(positive_integer), 70 timeout(number), 71 keep_alive_timeout(number), 72 silent(boolean), 73 ssl(list(any)), % if http/http_ssl_plugin is loaded 74 pass_to(system:thread_create/3, 3) 75 ]). 76:- predicate_options(http_spawn/2, 2, 77 [ pool(atom), 78 pass_to(system:thread_create/3, 3), 79 pass_to(thread_pool:thread_create_in_pool/4, 4) 80 ]). 81:- predicate_options(http_add_worker/2, 2, 82 [ timeout(number), 83 keep_alive_timeout(number), 84 max_idle_time(number), 85 pass_to(system:thread_create/3, 3) 86 ]).
114:- meta_predicate 115 http_server( , ), 116 http_current_server( , ), 117 http_spawn( , ). 118 119:- dynamic 120 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 121 queue_worker/2, % Queue, ThreadID 122 queue_options/2. % Queue, Options 123 124:- multifile 125 make_socket_hook/3, 126 accept_hook/2, 127 close_hook/1, 128 open_client_hook/6, 129 discard_client_hook/1, 130 http:create_pool/1, 131 http:schedule_workers/1. 132 133:- meta_predicate 134 thread_repeat_wait( ).
AF_UNIX
). See socket_create/2.main
thread.
If you need to control resource usage you may consider the
spawn
option of http_handler/3 and library(thread_pool).true
(default false
), do not print an informational
message that the server was started.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)). :- use_module(library(http/http_dispatch)). start_server(Port) :- http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
200http_server(Goal, M:Options0) :- 201 server_address(Address, Options0), 202 !, 203 make_socket(Address, M:Options0, Options), 204 create_workers(Options), 205 create_server(Goal, Address, Options), 206 ( option(silent(true), Options0) 207 -> true 208 ; print_message(informational, 209 httpd_started_server(Address, Options0)) 210 ). 211http_server(_Goal, _:Options0) :- 212 existence_error(server_address, Options0). 213 214server_address(Address, Options) :- 215 ( option(port(Port), Options) 216 -> Address = Port 217 ; option(unix_socket(Path), Options) 218 -> Address = unix_socket(Path) 219 ). 220 221address_port(_IFace:Port, Port) :- !. 222address_port(unix_socket(Path), Path) :- !. 223address_port(Address, Address) :- !. 224 225tcp_address(Port) :- 226 var(Port), 227 !. 228tcp_address(Port) :- 229 integer(Port), 230 !. 231tcp_address(_Iface:_Port). 232 233address_domain(localhost:_Port, Domain) => 234 Domain = inet. 235address_domain(Iface:_Port, Domain) => 236 ( catch(ip_name(IP, Iface), error(_,_), fail), 237 functor(IP, ip, 8) 238 -> Domain = inet6 239 ; Domain = inet 240 ). 241address_domain(_, Domain) => 242 Domain = inet.
queue(QueueId)
.
253make_socket(Address, M:Options0, Options) :- 254 tcp_address(Address), 255 make_socket_hook(Address, M:Options0, Options), 256 !. 257make_socket(Address, _:Options0, Options) :- 258 option(tcp_socket(_), Options0), 259 !, 260 make_addr_atom('httpd', Address, Queue), 261 Options = [ queue(Queue) 262 | Options0 263 ]. 264make_socket(Address, _:Options0, Options) :- 265 tcp_address(Address), 266 !, 267 address_domain(Address, Domain), 268 socket_create(Socket, [domain(Domain)]), 269 tcp_setopt(Socket, reuseaddr), 270 tcp_bind(Socket, Address), 271 tcp_listen(Socket, 64), 272 make_addr_atom('httpd', Address, Queue), 273 Options = [ queue(Queue), 274 tcp_socket(Socket) 275 | Options0 276 ]. 277:- if(current_predicate(unix_domain_socket/1)). 278make_socket(Address, _:Options0, Options) :- 279 Address = unix_socket(Path), 280 !, 281 unix_domain_socket(Socket), 282 tcp_bind(Socket, Path), 283 tcp_listen(Socket, 64), 284 make_addr_atom('httpd', Address, Queue), 285 Options = [ queue(Queue), 286 tcp_socket(Socket) 287 | Options0 288 ]. 289:- endif.
296make_addr_atom(Scheme, Address, Atom) :- 297 phrase(address_parts(Address), Parts), 298 atomic_list_concat([Scheme,@|Parts], Atom). 299 300address_parts(Var) --> 301 { var(Var), 302 !, 303 instantiation_error(Var) 304 }. 305address_parts(Atomic) --> 306 { atomic(Atomic) }, 307 !, 308 [Atomic]. 309address_parts(Host:Port) --> 310 !, 311 address_parts(Host), [:], address_parts(Port). 312address_parts(ip(A,B,C,D)) --> 313 !, 314 [ A, '.', B, '.', C, '.', D ]. 315address_parts(unix_socket(Path)) --> 316 [Path]. 317address_parts(Address) --> 318 { domain_error(http_server_address, Address) }.
326create_server(Goal, Address, Options) :- 327 get_time(StartTime), 328 memberchk(queue(Queue), Options), 329 scheme(Scheme, Options), 330 autoload_https(Scheme), 331 address_port(Address, Port), 332 make_addr_atom(Scheme, Port, Alias), 333 thread_self(Initiator), 334 thread_create(accept_server(Goal, Initiator, Options), _, 335 [ alias(Alias) 336 ]), 337 thread_get_message(server_started), 338 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 339 340scheme(Scheme, Options) :- 341 option(scheme(Scheme), Options), 342 !. 343scheme(Scheme, Options) :- 344 ( option(ssl(_), Options) 345 ; option(ssl_instance(_), Options) 346 ), 347 !, 348 Scheme = https. 349scheme(http, _). 350 351autoload_https(https) :- 352 \+ clause(accept_hook(_Goal, _Options), _), 353 exists_source(library(http/http_ssl_plugin)), 354 !, 355 use_module(library(http/http_ssl_plugin)). 356autoload_https(_).
364http_current_server(Goal, Port) :-
365 current_server(Port, Goal, _, _, _, _).
http
or https
.381http_server_property(_:Port, Property) :- 382 integer(Port), 383 !, 384 server_property(Property, Port). 385http_server_property(Port, Property) :- 386 server_property(Property, Port). 387 388server_property(goal(Goal), Port) :- 389 current_server(Port, Goal, _, _, _, _). 390server_property(scheme(Scheme), Port) :- 391 current_server(Port, _, _, _, Scheme, _). 392server_property(start_time(Time), Port) :- 393 current_server(Port, _, _, _, _, Time).
406http_workers(Port, Workers) :- 407 integer(Workers), 408 !, 409 must_be(ground, Port), 410 ( current_server(Port, _, _, Queue, _, _) 411 -> resize_pool(Queue, Workers) 412 ; existence_error(http_server, Port) 413 ). 414http_workers(Port, Workers) :- 415 current_server(Port, _, _, Queue, _, _), 416 aggregate_all(count, queue_worker(Queue, _Worker), Workers).
429http_add_worker(Port, Options) :- 430 must_be(ground, Port), 431 current_server(Port, _, _, Queue, _, _), 432 !, 433 queue_options(Queue, QueueOptions), 434 merge_options(Options, QueueOptions, WorkerOptions), 435 atom_concat(Queue, '_', AliasBase), 436 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 437http_add_worker(Port, _) :- 438 existence_error(http_server, Port).
448http_current_worker(Port, ThreadID) :-
449 current_server(Port, _, _, Queue, _, _),
450 queue_worker(Queue, ThreadID).
458accept_server(Goal, Initiator, Options) :- 459 Ex = http_stop(Stopper), 460 catch(accept_server2(Goal, Initiator, Options), Ex, true), 461 thread_self(Thread), 462 debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]), 463 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)), 464 close_pending_accepts(Queue), 465 close_server_socket(Options), 466 thread_send_message(Stopper, http_stopped). 467 468accept_server2(Goal, Initiator, Options) :- 469 thread_send_message(Initiator, server_started), 470 repeat, 471 ( catch(accept_server3(Goal, Options), E, true) 472 -> ( var(E) 473 -> fail 474 ; accept_rethrow_error(E) 475 -> throw(E) 476 ; print_message(error, E), 477 fail 478 ) 479 ; print_message(error, % internal error 480 goal_failed(accept_server3(Goal, Options))), 481 fail 482 ). 483 484accept_server3(Goal, Options) :- 485 accept_hook(Goal, Options), 486 !. 487accept_server3(Goal, Options) :- 488 memberchk(tcp_socket(Socket), Options), 489 memberchk(queue(Queue), Options), 490 debug(http(connection), 'Waiting for connection', []), 491 tcp_accept(Socket, Client, Peer), 492 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)), 493 http_enough_workers(Queue, accept, Peer). 494 495send_to_worker(Queue, Client, Goal, Peer) :- 496 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 497 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 498 499accept_rethrow_error(http_stop(_)).
505close_server_socket(Options) :- 506 close_hook(Options), 507 !. 508close_server_socket(Options) :- 509 memberchk(tcp_socket(Socket), Options), 510 !, 511 tcp_close_socket(Socket).
515close_pending_accepts(Queue) :- 516 ( thread_get_message(Queue, Msg, [timeout(0)]) 517 -> close_client(Msg), 518 close_pending_accepts(Queue) 519 ; true 520 ). 521 522close_client(tcp_client(Client, _Goal, _0Peer)) => 523 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]), 524 tcp_close_socket(Client). 525close_client(Msg) => 526 ( discard_client_hook(Msg) 527 -> true 528 ; print_message(warning, http_close_client(Msg)) 529 ).
539http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 540 ground(Host), 541 !, 542 http_stop_server(Port, Options). 543http_stop_server(Port, _Options) :- 544 http_workers(Port, 0), % checks Port is ground 545 current_server(Port, _, Thread, Queue, _Scheme, _Start), 546 retractall(queue_options(Queue, _)), 547 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]), 548 thread_self(Stopper), 549 thread_signal(Thread, throw(http_stop(Stopper))), 550 ( thread_get_message(Stopper, http_stopped, [timeout(0.1)]) 551 -> true 552 ; catch(connect(localhost:Port), _, true) 553 ), 554 thread_join(Thread, _0Status), 555 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]), 556 message_queue_destroy(Queue). 557 558connect(Address) :- 559 setup_call_cleanup( 560 tcp_socket(Socket), 561 tcp_connect(Socket, Address), 562 tcp_close_socket(Socket)).
570http_enough_workers(Queue, _Why, _Peer) :- 571 message_queue_property(Queue, waiting(_0)), 572 !, 573 debug(http(scheduler), '~D waiting for work; ok', [_0]). 574http_enough_workers(Queue, Why, Peer) :- 575 message_queue_property(Queue, size(Size)), 576 ( enough(Size, Why) 577 -> debug(http(scheduler), '~D in queue; ok', [Size]) 578 ; current_server(Port, _, _, Queue, _, _), 579 Data = _{ port:Port, 580 reason:Why, 581 peer:Peer, 582 waiting:Size, 583 queue:Queue 584 }, 585 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 586 catch(http:schedule_workers(Data), 587 Error, 588 print_message(error, Error)) 589 -> true 590 ; true 591 ). 592 593enough(0, _). 594enough(1, keep_alive). % I will be ready myself
accept
for a new connection or keep_alive
if a
worker tries to reschedule itself.
Note that, when called with reason:accept
, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
624 /******************************* 625 * WORKER QUEUE OPERATIONS * 626 *******************************/
633create_workers(Options) :- 634 option(workers(N), Options, 5), 635 option(queue(Queue), Options), 636 catch(message_queue_create(Queue), _, true), 637 atom_concat(Queue, '_', AliasBase), 638 create_workers(1, N, Queue, AliasBase, Options), 639 assert(queue_options(Queue, Options)). 640 641create_workers(I, N, _, _, _) :- 642 I > N, 643 !. 644create_workers(I, N, Queue, AliasBase, Options) :- 645 gensym(AliasBase, Alias), 646 thread_create(http_worker(Options), Id, 647 [ alias(Alias) 648 | Options 649 ]), 650 assertz(queue_worker(Queue, Id)), 651 I2 is I + 1, 652 create_workers(I2, N, Queue, AliasBase, Options).
660resize_pool(Queue, Size) :-
661 findall(W, queue_worker(Queue, W), Workers),
662 length(Workers, Now),
663 ( Now < Size
664 -> queue_options(Queue, Options),
665 atom_concat(Queue, '_', AliasBase),
666 I0 is Now+1,
667 create_workers(I0, Size, Queue, AliasBase, Options)
668 ; Now == Size
669 -> true
670 ; Now > Size
671 -> Excess is Now - Size,
672 thread_self(Me),
673 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
674 forall(between(1, Excess, _), thread_get_message(quitted(_)))
675 ).
If the message quit(Sender)
is read from the queue, the worker
stops.
686http_worker(Options) :- 687 debug(http(scheduler), 'New worker', []), 688 prolog_listen(this_thread_exit, done_worker), 689 option(queue(Queue), Options), 690 option(max_idle_time(MaxIdle), Options, infinite), 691 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 692 debug(http(worker), 'Waiting for a job ...', []), 693 debug(http(worker), 'Got job ~p', [Message]), 694 ( Message = quit(Sender) 695 -> !, 696 thread_self(Self), 697 thread_detach(Self), 698 ( Sender == idle 699 -> true 700 ; retract(queue_worker(Queue, Self)), 701 thread_send_message(Sender, quitted(Self)) 702 ) 703 ; open_client(Message, Queue, Goal, In, Out, 704 Options, ClientOptions), 705 ( catch(http_process(Goal, In, Out, ClientOptions), 706 Error, true) 707 -> true 708 ; Error = goal_failed(http_process/4) 709 ), 710 ( var(Error) 711 -> fail 712 ; current_message_level(Error, Level), 713 print_message(Level, Error), 714 memberchk(peer(Peer), ClientOptions), 715 close_connection(Peer, In, Out), 716 fail 717 ) 718 ). 719 720get_work(Queue, Message, infinite) :- 721 !, 722 thread_get_message(Queue, Message). 723get_work(Queue, Message, MaxIdle) :- 724 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 725 -> true 726 ; Message = quit(idle) 727 ).
736open_client(requeue(In, Out, Goal, ClOpts), 737 _, Goal, In, Out, Opts, ClOpts) :- 738 !, 739 memberchk(peer(Peer), ClOpts), 740 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 741 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 742open_client(Message, Queue, Goal, In, Out, Opts, 743 [ pool(client(Queue, Goal, In, Out)), 744 timeout(Timeout) 745 | Options 746 ]) :- 747 catch(open_client(Message, Goal, In, Out, Options, Opts), 748 E, report_error(E)), 749 option(timeout(Timeout), Opts, 60), 750 ( debugging(http(connection)) 751 -> memberchk(peer(Peer), Options), 752 debug(http(connection), 'Opened connection from ~p', [Peer]) 753 ; true 754 ).
760open_client(Message, Goal, In, Out, ClientOptions, Options) :- 761 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 762 !. 763open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 764 [ peer(Peer), 765 protocol(http) 766 ], _) :- 767 tcp_open_socket(Socket, In, Out). 768 769report_error(E) :- 770 print_message(error, E), 771 fail.
780check_keep_alive_connection(In, TMO, Peer, In, Out) :-
781 stream_property(In, timeout(Old)),
782 set_stream(In, timeout(TMO)),
783 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
784 catch(peek_code(In, Code), E, true),
785 ( var(E), % no exception
786 Code \== -1 % no end-of-file
787 -> set_stream(In, timeout(Old)),
788 debug(http(keep_alive), '\tre-using keep-alive connection', [])
789 ; ( Code == -1
790 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
791 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
792 ),
793 close_connection(Peer, In, Out),
794 fail
795 ).
804done_worker :- 805 thread_self(Self), 806 thread_detach(Self), 807 retract(queue_worker(Queue, Self)), 808 thread_property(Self, status(Status)), 809 !, 810 ( catch(recreate_worker(Status, Queue), _, fail) 811 -> print_message(informational, 812 httpd_restarted_worker(Self)) 813 ; done_status_message_level(Status, Level), 814 print_message(Level, 815 httpd_stopped_worker(Self, Status)) 816 ). 817done_worker :- % received quit(Sender) 818 thread_self(Self), 819 thread_property(Self, status(Status)), 820 done_status_message_level(Status, Level), 821 print_message(Level, 822 httpd_stopped_worker(Self, Status)). 823 824done_status_message_level(true, silent) :- !. 825done_status_message_level(exception('$aborted'), silent) :- !. 826done_status_message_level(exception(unwind(abort)), silent) :- !. 827done_status_message_level(exception(unwind(halt(_))), silent) :- !. 828done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error
. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
843recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 844 halt(2). 845recreate_worker(exception(Error), Queue) :- 846 recreate_on_error(Error), 847 queue_options(Queue, Options), 848 atom_concat(Queue, '_', AliasBase), 849 create_workers(1, 1, Queue, AliasBase, Options). 850 851recreate_on_error('$aborted'). 852recreate_on_error(unwind(abort)). 853recreate_on_error(time_limit_exceeded).
862:- multifile 863 message_level/2. 864 865message_level(error(io_error(read, _), _), silent). 866message_level(error(socket_error(epipe,_), _), silent). 867message_level(error(http_write_short(_Obj,_Written), _), silent). 868message_level(error(timeout_error(read, _), _), informational). 869message_level(keep_alive_timeout, silent). 870 871current_message_level(Term, Level) :- 872 ( message_level(Term, Level) 873 -> true 874 ; Level = error 875 ).
883http_requeue(Header) :- 884 requeue_header(Header, ClientOptions), 885 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 886 memberchk(peer(Peer), ClientOptions), 887 http_enough_workers(Queue, keep_alive, Peer), 888 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 889 !. 890http_requeue(Header) :- 891 debug(http(error), 'Re-queue failed: ~p', [Header]), 892 fail. 893 894requeue_header([], []). 895requeue_header([H|T0], [H|T]) :- 896 requeue_keep(H), 897 !, 898 requeue_header(T0, T). 899requeue_header([_|T0], T) :- 900 requeue_header(T0, T). 901 902requeue_keep(pool(_)). 903requeue_keep(peer(_)). 904requeue_keep(protocol(_)).
911http_process(Goal, In, Out, Options) :- 912 debug(http(server), 'Running server goal ~p on ~p -> ~p', 913 [Goal, In, Out]), 914 option(timeout(TMO), Options, 60), 915 set_stream(In, timeout(TMO)), 916 set_stream(Out, timeout(TMO)), 917 http_wrapper(Goal, In, Out, Connection, 918 [ request(Request) 919 | Options 920 ]), 921 next(Connection, Request). 922 923next(Connection, Request) :- 924 next_(Connection, Request), !. 925next(Connection, Request) :- 926 print_message(warning, goal_failed(next(Connection,Request))). 927 928next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :- 929 !, 930 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 931 ( catch(call(SwitchGoal, In, Out), E, 932 ( print_message(error, E), 933 fail)) 934 -> true 935 ; http_close_connection(Request) 936 ). 937next_(spawned(ThreadId), _) :- 938 !, 939 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 940next_(Connection, Request) :- 941 downcase_atom(Connection, 'keep-alive'), 942 http_requeue(Request), 943 !. 944next_(_, Request) :- 945 http_close_connection(Request).
952http_close_connection(Request) :-
953 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
954 memberchk(peer(Peer), Request),
955 close_connection(Peer, In, Out).
962close_connection(Peer, In, Out) :-
963 debug(http(connection), 'Closing connection from ~p', [Peer]),
964 catch(close(In, [force(true)]), _, true),
965 catch(close(Out, [force(true)]), _, true).
If a pool does not exist, this predicate calls the multifile hook create_pool/1 to create it. If this predicate succeeds the operation is retried.
983http_spawn(Goal, Options) :- 984 select_option(pool(Pool), Options, ThreadOptions), 985 !, 986 current_output(CGI), 987 catch(thread_create_in_pool(Pool, 988 wrap_spawned(CGI, Goal), Id, 989 [ detached(true) 990 | ThreadOptions 991 ]), 992 Error, 993 true), 994 ( var(Error) 995 -> http_spawned(Id) 996 ; Error = error(resource_error(threads_in_pool(_)), _) 997 -> throw(http_reply(busy)) 998 ; Error = error(existence_error(thread_pool, Pool), _), 999 create_pool(Pool) 1000 -> http_spawn(Goal, Options) 1001 ; throw(Error) 1002 ). 1003http_spawn(Goal, Options) :- 1004 current_output(CGI), 1005 thread_create(wrap_spawned(CGI, Goal), Id, 1006 [ detached(true) 1007 | Options 1008 ]), 1009 http_spawned(Id). 1010 1011wrap_spawned(CGI, Goal) :- 1012 set_output(CGI), 1013 http_wrap_spawned(Goal, Request, Connection), 1014 next(Connection, Request).
1024create_pool(Pool) :- 1025 E = error(permission_error(create, thread_pool, Pool), _), 1026 catch(http:create_pool(Pool), E, true). 1027create_pool(Pool) :- 1028 print_message(informational, httpd(created_pool(Pool))), 1029 thread_pool_create(Pool, 10, []). 1030 1031 1032 /******************************* 1033 * WAIT POLICIES * 1034 *******************************/ 1035 1036:- meta_predicate 1037 thread_repeat_wait( ).
repeat, thread_idle(Goal)
, choosing whether to use a
long
or short
idle time based on the average firing rate.1044thread_repeat_wait(Goal) :- 1045 new_rate_mma(5, 1000, State), 1046 repeat, 1047 update_rate_mma(State, MMA), 1048 long(MMA, IsLong), 1049 ( IsLong == brief 1050 -> call(Goal) 1051 ; thread_idle(Goal, IsLong) 1052 ). 1053 1054long(MMA, brief) :- 1055 MMA < 0.05, 1056 !. 1057long(MMA, short) :- 1058 MMA < 1, 1059 !. 1060long(_, long).
1074new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 1075 current_prolog_flag(max_tagged_integer, MaxI), 1076 get_time(Base). 1077 1078update_rate_mma(State, MMAr) :- 1079 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 1080 get_time(Now), 1081 Stamp is round((Now-Base)*Resolution), 1082 ( Stamp > MaxI 1083 -> nb_setarg(1, State, Now), 1084 nb_setarg(2, State, 0) 1085 ; true 1086 ), 1087 Diff is Stamp-Last, 1088 nb_setarg(2, State, Stamp), 1089 MMA is round(((N-1)*MMA0+Diff)/N), 1090 nb_setarg(6, State, MMA), 1091 MMAr is MMA/float(Resolution). 1092 1093 1094 /******************************* 1095 * MESSAGES * 1096 *******************************/ 1097 1098:- multifile 1099 prolog:message/3. 1100 1101prologmessage(httpd_started_server(Port, Options)) --> 1102 [ 'Started server at '-[] ], 1103 http_root(Port, Options). 1104prologmessage(httpd_stopped_worker(Self, Status)) --> 1105 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1106prologmessage(httpd_restarted_worker(Self)) --> 1107 [ 'Replaced aborted worker ~p'-[Self] ]. 1108prologmessage(httpd(created_pool(Pool))) --> 1109 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1110 'Create this pool at startup-time or define the hook ', nl, 1111 'http:create_pool/1 to avoid this message and create a ', nl, 1112 'pool that fits the usage-profile.' 1113 ]. 1114 1115http_root(Address, Options) --> 1116 { landing_page(Address, URI, Options) }, 1117 [ '~w'-[URI] ]. 1118 1119landing_page(Host:Port, URI, Options) :- 1120 !, 1121 must_be(atom, Host), 1122 must_be(integer, Port), 1123 http_server_property(Port, scheme(Scheme)), 1124 ( default_port(Scheme, Port) 1125 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1126 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1127 ), 1128 entry_page(Base, URI, Options). 1129landing_page(unix_socket(Path), URI, _Options) :- 1130 !, 1131 format(string(URI), 'Unix domain socket "~w"', [Path]). 1132landing_page(Port, URI, Options) :- 1133 landing_page(localhost:Port, URI, Options). 1134 1135default_port(http, 80). 1136default_port(https, 443). 1137 1138entry_page(Base, URI, Options) :- 1139 option(entry_page(Entry), Options), 1140 !, 1141 uri_resolve(Entry, Base, URI). 1142entry_page(Base, URI, _) :- 1143 http_absolute_location(root(.), Entry, []), 1144 uri_resolve(Entry, Base, URI)
Threaded HTTP server
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.
On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.
Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */