View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2024, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(thread_httpd,
   39          [ http_current_server/2,      % ?:Goal, ?Port
   40            http_server_property/2,     % ?Port, ?Property
   41            http_server/2,              % :Goal, +Options
   42            http_workers/2,             % +Port, ?WorkerCount
   43            http_add_worker/2,          % +Port, +Options
   44            http_current_worker/2,      % ?Port, ?ThreadID
   45            http_stop_server/2,         % +Port, +Options
   46            http_spawn/2,               % :Goal, +Options
   47
   48            http_requeue/1,             % +Request
   49            http_close_connection/1,    % +Request
   50            http_enough_workers/3       % +Queue, +Why, +Peer
   51          ]).   52:- use_module(library(debug)).   53:- use_module(library(error)).   54:- use_module(library(option)).   55:- use_module(library(socket)).   56:- use_module(library(thread_pool)).   57:- use_module(library(gensym)).   58:- use_module(http_wrapper).   59:- use_module(http_path).   60:- use_module(http_stream).   61
   62:- autoload(library(uri), [uri_resolve/3]).   63:- autoload(library(aggregate), [aggregate_all/3]).   64
   65:- predicate_options(http_server/2, 2,
   66                     [ port(any),
   67                       unix_socket(atom),
   68                       entry_page(atom),
   69                       tcp_socket(any),
   70                       workers(positive_integer),
   71                       timeout(number),
   72                       keep_alive_timeout(number),
   73                       silent(boolean),
   74                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   75                       pass_to(system:thread_create/3, 3)
   76                     ]).   77:- predicate_options(http_spawn/2, 2,
   78                     [ pool(atom),
   79                       pass_to(system:thread_create/3, 3),
   80                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   81                     ]).   82:- predicate_options(http_add_worker/2, 2,
   83                     [ timeout(number),
   84                       keep_alive_timeout(number),
   85                       max_idle_time(number),
   86                       pass_to(system:thread_create/3, 3)
   87                     ]).

Threaded HTTP server

Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.

This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.

On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.

Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */

  115:- meta_predicate
  116    http_server(1, :),
  117    http_current_server(1, ?),
  118    http_spawn(0, +).  119
  120:- dynamic
  121    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  122    queue_worker/2,         % Queue, ThreadID
  123    queue_options/2.        % Queue, Options
  124
  125:- multifile
  126    make_socket_hook/3,
  127    accept_hook/2,
  128    close_hook/1,
  129    open_client_hook/6,
  130    discard_client_hook/1,
  131    http:create_pool/1,
  132    http:schedule_workers/1.  133
  134:- meta_predicate
  135    thread_repeat_wait(0).
 http_server(:Goal, :Options) is det
Create a server at Port that calls Goal for each parsed request. Options provide a list of options. Defined options are
port(?Address)
Port to bind to. Address is either a port or a term Host:Port. The port may be a variable, causing the system to select a free port. See tcp_bind/2.
unix_socket(+Path)
Instead of binding to a TCP port, bind to a Unix Domain Socket at Path.
entry_page(+URI)
Affects the message printed while the server is started. Interpreted as a URI relative to the server root.
tcp_socket(+Socket)
If provided, use this socket instead of the creating one and binding it to an address. The socket must be bound to an address. Note that this also allows binding an HTTP server to a Unix domain socket (AF_UNIX). See socket_create/2.
workers(+Count)
Determine the number of worker threads. Default is 5. This is fine for small scale usage. Public servers typically need a higher number.
timeout(+Seconds)
Maximum time of inactivity trying to read the request after a connection has been opened. Default is 60 seconds. See set_stream/1 using the timeout option.
keep_alive_timeout(+Seconds)
Time to keep `Keep alive' connections alive. Default is 2 seconds.
stack_limit(+Bytes)
Stack limit to use for the workers. The default is inherited from the main thread. If you need to control resource usage you may consider the spawn option of http_handler/3 and library(thread_pool).
silent(Bool)
If true (default false), do not print an informational message that the server was started.

A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:

:- use_module(library(http/thread_httpd)).
:- use_module(library(http/http_dispatch)).

start_server(Port) :-
    http_server(http_dispatch, [port(Port)]).

Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.

  201http_server(Goal, M:Options0) :-
  202    server_address(Address, Options0),
  203    !,
  204    make_socket(Address, M:Options0, Options),
  205    create_workers(Options),
  206    create_server(Goal, Address, Options),
  207    (   option(silent(true), Options0)
  208    ->  true
  209    ;   print_message(informational,
  210                      httpd_started_server(Address, Options0))
  211    ).
  212http_server(_Goal, _:Options0) :-
  213    existence_error(server_address, Options0).
  214
  215server_address(Address, Options) :-
  216    (   option(port(Port), Options)
  217    ->  Address = Port
  218    ;   option(unix_socket(Path), Options)
  219    ->  Address = unix_socket(Path)
  220    ).
  221
  222address_port(_IFace:Port, Port) :- !.
  223address_port(unix_socket(Path), Path) :- !.
  224address_port(Address, Address) :- !.
  225
  226tcp_address(Port) :-
  227    var(Port),
  228    !.
  229tcp_address(Port) :-
  230    integer(Port),
  231    !.
  232tcp_address(_Iface:_Port).
  233
  234address_domain(localhost:_Port, Domain) =>
  235    Domain = inet.
  236address_domain(Iface:_Port, Domain) =>
  237    (   catch(ip_name(IP, Iface), error(_,_), fail),
  238        functor(IP, ip, 8)
  239    ->  Domain = inet6
  240    ;   Domain = inet
  241    ).
  242address_domain(_, Domain) =>
  243    Domain = inet.
 make_socket(+Address, :OptionsIn, -OptionsOut) is det
Create the HTTP server socket and worker pool queue. OptionsOut is guaranteed to hold the option queue(QueueId).
Arguments:
OptionsIn- is qualified to allow passing the module-sensitive ssl option argument.
  254make_socket(Address, M:Options0, Options) :-
  255    tcp_address(Address),
  256    make_socket_hook(Address, M:Options0, Options),
  257    !.
  258make_socket(Address, _:Options0, Options) :-
  259    option(tcp_socket(_), Options0),
  260    !,
  261    make_addr_atom('httpd', Address, Queue),
  262    Options = [ queue(Queue)
  263              | Options0
  264              ].
  265make_socket(Address, _:Options0, Options) :-
  266    tcp_address(Address),
  267    !,
  268    address_domain(Address, Domain),
  269    socket_create(Socket, [domain(Domain)]),
  270    tcp_setopt(Socket, reuseaddr),
  271    tcp_bind(Socket, Address),
  272    tcp_listen(Socket, 64),
  273    make_addr_atom('httpd', Address, Queue),
  274    Options = [ queue(Queue),
  275                tcp_socket(Socket)
  276              | Options0
  277              ].
  278:- if(current_predicate(unix_domain_socket/1)).  279make_socket(Address, _:Options0, Options) :-
  280    Address = unix_socket(Path),
  281    !,
  282    unix_domain_socket(Socket),
  283    tcp_bind(Socket, Path),
  284    tcp_listen(Socket, 64),
  285    make_addr_atom('httpd', Address, Queue),
  286    Options = [ queue(Queue),
  287                tcp_socket(Socket)
  288              | Options0
  289              ].
  290:- endif.
 make_addr_atom(+Scheme, +Address, -Atom) is det
Create an atom that identifies the server's queue and thread resources.
  297make_addr_atom(Scheme, Address, Atom) :-
  298    phrase(address_parts(Address), Parts),
  299    atomic_list_concat([Scheme,@|Parts], Atom).
  300
  301address_parts(Var) -->
  302    { var(Var),
  303      !,
  304      instantiation_error(Var)
  305    }.
  306address_parts(Atomic) -->
  307    { atomic(Atomic) },
  308    !,
  309    [Atomic].
  310address_parts(Host:Port) -->
  311    !,
  312    address_parts(Host), [:], address_parts(Port).
  313address_parts(ip(A,B,C,D)) -->
  314    !,
  315    [ A, '.', B, '.', C, '.', D ].
  316address_parts(unix_socket(Path)) -->
  317    [Path].
  318address_parts(Address) -->
  319    { domain_error(http_server_address, Address) }.
 create_server(:Goal, +Address, +Options) is det
Create the main server thread that runs accept_server/2 to listen to new requests.
  327create_server(Goal, Address, Options) :-
  328    get_time(StartTime),
  329    memberchk(queue(Queue), Options),
  330    scheme(Scheme, Options),
  331    autoload_https(Scheme),
  332    address_port(Address, Port),
  333    make_addr_atom(Scheme, Port, Alias),
  334    thread_self(Initiator),
  335    thread_create(accept_server(Goal, Initiator, Options), _,
  336                  [ alias(Alias),
  337                    class(service)
  338                  ]),
  339    thread_get_message(server_started),
  340    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  341
  342scheme(Scheme, Options) :-
  343    option(scheme(Scheme), Options),
  344    !.
  345scheme(Scheme, Options) :-
  346    (   option(ssl(_), Options)
  347    ;   option(ssl_instance(_), Options)
  348    ),
  349    !,
  350    Scheme = https.
  351scheme(http, _).
  352
  353autoload_https(https) :-
  354    \+ clause(accept_hook(_Goal, _Options), _),
  355    exists_source(library(http/http_ssl_plugin)),
  356    !,
  357    use_module(library(http/http_ssl_plugin)).
  358autoload_https(_).
 http_current_server(:Goal, ?Port) is nondet
True if Goal is the goal of a server at Port.
deprecated
- Use http_server_property(Port, goal(Goal))
  366http_current_server(Goal, Port) :-
  367    current_server(Port, Goal, _, _, _, _).
 http_server_property(?Port, ?Property) is nondet
True if Property is a property of the HTTP server running at Port. Defined properties are:
goal(:Goal)
Goal used to start the server. This is often http_dispatch/1.
scheme(-Scheme)
Scheme is one of http or https.
start_time(?Time)
Time-stamp when the server was created.
  383http_server_property(_:Port, Property) :-
  384    integer(Port),
  385    !,
  386    server_property(Property, Port).
  387http_server_property(Port, Property) :-
  388    server_property(Property, Port).
  389
  390server_property(goal(Goal), Port) :-
  391    current_server(Port, Goal, _, _, _, _).
  392server_property(scheme(Scheme), Port) :-
  393    current_server(Port, _, _, _, Scheme, _).
  394server_property(start_time(Time), Port) :-
  395    current_server(Port, _, _, _, _, Time).
 http_workers(?Port, -Workers) is nondet
http_workers(+Port, +Workers:int) is det
Query or set the number of workers for the server at this port. The number of workers is dynamically modified. Setting it to 1 (one) can be used to profile the worker using tprofile/1.
See also
- library(http/http_dyn_workers) implements dynamic management of the worker pool depending on usage.
  408http_workers(Port, Workers) :-
  409    integer(Workers),
  410    !,
  411    must_be(ground, Port),
  412    (   current_server(Port, _, _, Queue, _, _)
  413    ->  resize_pool(Queue, Workers)
  414    ;   existence_error(http_server, Port)
  415    ).
  416http_workers(Port, Workers) :-
  417    current_server(Port, _, _, Queue, _, _),
  418    aggregate_all(count, queue_worker(Queue, _Worker), Workers).
 http_add_worker(+Port, +Options) is det
Add a new worker to the HTTP server for port Port. Options overrule the default queue options. The following additional options are processed:
max_idle_time(+Seconds)
The created worker will automatically terminate if there is no new work within Seconds.
  431http_add_worker(Port, Options) :-
  432    must_be(ground, Port),
  433    current_server(Port, _, _, Queue, _, _),
  434    !,
  435    queue_options(Queue, QueueOptions),
  436    merge_options(Options, QueueOptions, WorkerOptions),
  437    atom_concat(Queue, '_', AliasBase),
  438    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  439http_add_worker(Port, _) :-
  440    existence_error(http_server, Port).
 http_current_worker(?Port, ?ThreadID) is nondet
True if ThreadID is the identifier of a Prolog thread serving Port. This predicate is motivated to allow for the use of arbitrary interaction with the worker thread for development and statistics.
  450http_current_worker(Port, ThreadID) :-
  451    current_server(Port, _, _, Queue, _, _),
  452    queue_worker(Queue, ThreadID).
 accept_server(:Goal, +Initiator, +Options)
The goal of a small server-thread accepting new requests and posting them to the queue of workers.
  460accept_server(Goal, Initiator, Options) :-
  461    Ex = http_stop(Stopper),
  462    catch(accept_server2(Goal, Initiator, Options), Ex, true),
  463    thread_self(Thread),
  464    debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]),
  465    retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)),
  466    close_pending_accepts(Queue),
  467    close_server_socket(Options),
  468    thread_send_message(Stopper, http_stopped).
  469
  470accept_server2(Goal, Initiator, Options) :-
  471    thread_send_message(Initiator, server_started),
  472    repeat,
  473      (   catch(accept_server3(Goal, Options), E, true)
  474      ->  (   var(E)
  475          ->  fail
  476          ;   accept_rethrow_error(E)
  477          ->  throw(E)
  478          ;   print_message(error, E),
  479              fail
  480          )
  481      ;   print_message(error,      % internal error
  482                        goal_failed(accept_server3(Goal, Options))),
  483          fail
  484      ).
  485
  486accept_server3(Goal, Options) :-
  487    accept_hook(Goal, Options),
  488    !.
  489accept_server3(Goal, Options) :-
  490    memberchk(tcp_socket(Socket), Options),
  491    memberchk(queue(Queue), Options),
  492    debug(http(connection), 'Waiting for connection', []),
  493    tcp_accept(Socket, Client, Peer),
  494    sig_atomic(send_to_worker(Queue, Client, Goal, Peer)),
  495    http_enough_workers(Queue, accept, Peer).
  496
  497send_to_worker(Queue, Client, Goal, Peer) :-
  498    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  499    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  500
  501accept_rethrow_error(http_stop(_)).
 close_server_socket(+Options)
Close the server socket.
  507close_server_socket(Options) :-
  508    close_hook(Options),
  509    !.
  510close_server_socket(Options) :-
  511    memberchk(tcp_socket(Socket), Options),
  512    !,
  513    tcp_close_socket(Socket).
 close_pending_accepts(+Queue)
  517close_pending_accepts(Queue) :-
  518    (   thread_get_message(Queue, Msg, [timeout(0)])
  519    ->  close_client(Msg),
  520        close_pending_accepts(Queue)
  521    ;   true
  522    ).
  523
  524close_client(tcp_client(Client, _Goal, _0Peer)) =>
  525    debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]),
  526    tcp_close_socket(Client).
  527close_client(Msg) =>
  528    (   discard_client_hook(Msg)
  529    ->  true
  530    ;   print_message(warning, http_close_client(Msg))
  531    ).
 http_stop_server(+Port, +Options)
Stop the indicated HTTP server gracefully. First stops all workers, then stops the server.
To be done
- Realise non-graceful stop
  541http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  542    ground(Host),
  543    !,
  544    http_stop_server(Port, Options).
  545http_stop_server(Port, _Options) :-
  546    http_workers(Port, 0),                  % checks Port is ground
  547    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  548    retractall(queue_options(Queue, _)),
  549    debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]),
  550    thread_self(Stopper),
  551    thread_signal(Thread, throw(http_stop(Stopper))),
  552    (   thread_get_message(Stopper, http_stopped, [timeout(0.1)])
  553    ->  true
  554    ;   catch(connect(localhost:Port), _, true)
  555    ),
  556    thread_join(Thread, _0Status),
  557    debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]),
  558    message_queue_destroy(Queue).
  559
  560connect(Address) :-
  561    setup_call_cleanup(
  562        tcp_socket(Socket),
  563        tcp_connect(Socket, Address),
  564        tcp_close_socket(Socket)).
 http_enough_workers(+Queue, +Why, +Peer) is det
Check that we have enough workers in our queue. If not, call the hook http:schedule_workers/1 to extend the worker pool. This predicate can be used by accept_hook/2.
  572http_enough_workers(Queue, _Why, _Peer) :-
  573    message_queue_property(Queue, waiting(_0)),
  574    !,
  575    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  576http_enough_workers(Queue, Why, Peer) :-
  577    message_queue_property(Queue, size(Size)),
  578    (   enough(Size, Why)
  579    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  580    ;   current_server(Port, _, _, Queue, _, _),
  581        Data = _{ port:Port,
  582                  reason:Why,
  583                  peer:Peer,
  584                  waiting:Size,
  585                  queue:Queue
  586                },
  587        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  588        catch(http:schedule_workers(Data),
  589              Error,
  590              print_message(error, Error))
  591    ->  true
  592    ;   true
  593    ).
  594
  595enough(0, _).
  596enough(1, keep_alive).                  % I will be ready myself
 http:schedule_workers(+Data:dict) is semidet
Hook called if a new connection or a keep-alive connection cannot be scheduled immediately to a worker. Dict contains the following keys:
port:Port
Port number that identifies the server.
reason:Reason
One of accept for a new connection or keep_alive if a worker tries to reschedule itself.
peer:Peer
Identify the other end of the connection
waiting:Size
Number of messages waiting in the queue.
queue:Queue
Message queue used to dispatch accepted messages.

Note that, when called with reason:accept, we are called in the time critical main accept loop. An implementation of this hook shall typically send the event to thread dedicated to dynamic worker-pool management.

See also
- http_add_worker/2 may be used to create (temporary) extra workers.
  626                 /*******************************
  627                 *    WORKER QUEUE OPERATIONS   *
  628                 *******************************/
 create_workers(+Options)
Create the pool of HTTP worker-threads. Each worker has the alias http_worker_N.
  635create_workers(Options) :-
  636    option(workers(N), Options, 5),
  637    option(queue(Queue), Options),
  638    catch(message_queue_create(Queue), _, true),
  639    atom_concat(Queue, '_', AliasBase),
  640    create_workers(1, N, Queue, AliasBase, Options),
  641    assert(queue_options(Queue, Options)).
  642
  643create_workers(I, N, _, _, _) :-
  644    I > N,
  645    !.
  646create_workers(I, N, Queue, AliasBase, Options) :-
  647    gensym(AliasBase, Alias),
  648    thread_create(http_worker(Options), Id,
  649                  [ alias(Alias),
  650                    class(http)
  651                  | Options
  652                  ]),
  653    assertz(queue_worker(Queue, Id)),
  654    I2 is I + 1,
  655    create_workers(I2, N, Queue, AliasBase, Options).
 resize_pool(+Queue, +Workers) is det
Create or destroy workers. If workers are destroyed, the call waits until the desired number of waiters is reached.
  663resize_pool(Queue, Size) :-
  664    findall(W, queue_worker(Queue, W), Workers),
  665    length(Workers, Now),
  666    (   Now < Size
  667    ->  queue_options(Queue, Options),
  668        atom_concat(Queue, '_', AliasBase),
  669        I0 is Now+1,
  670        create_workers(I0, Size, Queue, AliasBase, Options)
  671    ;   Now == Size
  672    ->  true
  673    ;   Now > Size
  674    ->  Excess is Now - Size,
  675        thread_self(Me),
  676        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  677        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  678    ).
 http_worker(+Options)
Run HTTP worker main loop. Workers simply wait until they are passed an accepted socket to process a client.

If the message quit(Sender) is read from the queue, the worker stops.

  689http_worker(Options) :-
  690    debug(http(scheduler), 'New worker', []),
  691    prolog_listen(this_thread_exit, done_worker),
  692    option(queue(Queue), Options),
  693    option(max_idle_time(MaxIdle), Options, infinite),
  694    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  695      debug(http(worker), 'Waiting for a job ...', []),
  696      debug(http(worker), 'Got job ~p', [Message]),
  697      (   Message = quit(Sender)
  698      ->  !,
  699          thread_self(Self),
  700          thread_detach(Self),
  701          (   Sender == idle
  702          ->  true
  703          ;   retract(queue_worker(Queue, Self)),
  704              thread_send_message(Sender, quitted(Self))
  705          )
  706      ;   open_client(Message, Queue, Goal, In, Out,
  707                      Options, ClientOptions),
  708          (   catch(http_process(Goal, In, Out, ClientOptions),
  709                    Error, true)
  710          ->  true
  711          ;   Error = goal_failed(http_process/4)
  712          ),
  713          debug_reset_from_class,   % Restore debug mode after user nodebug
  714          (   var(Error)
  715          ->  fail
  716          ;   current_message_level(Error, Level),
  717              print_message(Level, Error),
  718              memberchk(peer(Peer), ClientOptions),
  719              close_connection(Peer, In, Out),
  720              fail
  721          )
  722      ).
  723
  724get_work(Queue, Message, infinite) :-
  725    !,
  726    thread_get_message(Queue, Message).
  727get_work(Queue, Message, MaxIdle) :-
  728    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  729    ->  true
  730    ;   Message = quit(idle)
  731    ).
 open_client(+Message, +Queue, -Goal, -In, -Out, +Options, -ClientOptions) is semidet
Opens the connection to the client in a worker from the message sent to the queue by accept_server/2.
  740open_client(requeue(In, Out, Goal, ClOpts),
  741            _, Goal, In, Out, Opts, ClOpts) :-
  742    !,
  743    memberchk(peer(Peer), ClOpts),
  744    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  745    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  746open_client(Message, Queue, Goal, In, Out, Opts,
  747            [ pool(client(Queue, Goal, In, Out)),
  748              timeout(Timeout)
  749            | Options
  750            ]) :-
  751    catch(open_client(Message, Goal, In, Out, Options, Opts),
  752          E, report_error(E)),
  753    option(timeout(Timeout), Opts, 60),
  754    (   debugging(http(connection))
  755    ->  memberchk(peer(Peer), Options),
  756        debug(http(connection), 'Opened connection from ~p', [Peer])
  757    ;   true
  758    ).
 open_client(+Message, +Goal, -In, -Out, -ClientOptions, +Options) is det
  764open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  765    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  766    !.
  767open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  768            [ peer(Peer),
  769              protocol(http)
  770            ], _) :-
  771    tcp_open_socket(Socket, In, Out).
  772
  773report_error(E) :-
  774    print_message(error, E),
  775    fail.
 check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet
Wait for the client for at most TimeOut seconds. Succeed if the client starts a new request within this time. Otherwise close the connection and fail.
  784check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  785    stream_property(In, timeout(Old)),
  786    set_stream(In, timeout(TMO)),
  787    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  788    catch(peek_code(In, Code), E, true),
  789    (   var(E),                     % no exception
  790        Code \== -1                 % no end-of-file
  791    ->  set_stream(In, timeout(Old)),
  792        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  793    ;   (   Code == -1
  794        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  795        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  796        ),
  797        close_connection(Peer, In, Out),
  798        fail
  799    ).
 done_worker
Called when worker is terminated due to http_workers/2 or a (debugging) exception. In the latter case, recreate_worker/2 creates a new worker.
  808done_worker :-
  809    thread_self(Self),
  810    thread_detach(Self),
  811    retract(queue_worker(Queue, Self)),
  812    thread_property(Self, status(Status)),
  813    !,
  814    (   catch(recreate_worker(Status, Queue), _, fail)
  815    ->  print_message(informational,
  816                      httpd_restarted_worker(Self))
  817    ;   done_status_message_level(Status, Level),
  818        print_message(Level,
  819                      httpd_stopped_worker(Self, Status))
  820    ).
  821done_worker :-                                  % received quit(Sender)
  822    thread_self(Self),
  823    thread_property(Self, status(Status)),
  824    done_status_message_level(Status, Level),
  825    print_message(Level,
  826                  httpd_stopped_worker(Self, Status)).
  827
  828done_status_message_level(true, silent) :- !.
  829done_status_message_level(exception('$aborted'), silent) :- !.
  830done_status_message_level(exception(unwind(abort)), silent) :- !.
  831done_status_message_level(exception(unwind(halt(_))), silent) :- !.
  832done_status_message_level(_, informational).
 recreate_worker(+Status, +Queue) is semidet
Deal with the possibility that threads are, during development, killed with abort/0. We recreate the worker to avoid that eventually we run out of workers. If we are aborted due to a halt/0 call, thread_create/3 will raise a permission error.

The first clause deals with the possibility that we cannot write to user_error. This is possible when Prolog is started as a service using some service managers. Would be nice if we could write an error, but where?

  847recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  848    halt(2).
  849recreate_worker(exception(Error), Queue) :-
  850    recreate_on_error(Error),
  851    queue_options(Queue, Options),
  852    atom_concat(Queue, '_', AliasBase),
  853    create_workers(1, 1, Queue, AliasBase, Options).
  854
  855recreate_on_error('$aborted').
  856recreate_on_error(unwind(abort)).
  857recreate_on_error(time_limit_exceeded).
 thread_httpd:message_level(+Exception, -Level)
Determine the message stream used for exceptions that may occur during server_loop/5. Being multifile, clauses can be added by the application to refine error handling. See also message_hook/3 for further programming error handling.
  866:- multifile
  867    message_level/2.  868
  869message_level(error(io_error(read, _), _),               silent).
  870message_level(error(socket_error(epipe,_), _),           silent).
  871message_level(error(http_write_short(_Obj,_Written), _), silent).
  872message_level(error(timeout_error(read, _), _),          informational).
  873message_level(keep_alive_timeout,                        silent).
  874
  875current_message_level(Term, Level) :-
  876    (   message_level(Term, Level)
  877    ->  true
  878    ;   Level = error
  879    ).
 read_remaining_request(+StartBody, +Request) is semidet
If our handler did not read the complete request we must read the remainder if we are dealing with a Keep-alive connection.
  886read_remaining_request(StartBody, Request) :-
  887    memberchk(content_length(Len), Request),
  888    !,
  889    memberchk(pool(client(_Queue, _Goal, In, _Out)), Request),
  890    byte_count(In, Here),
  891    Left is StartBody+Len-Here,
  892    read_incomplete(In, Left).
  893read_remaining_request(_, _Request).
  894
  895read_incomplete(_, 0) :-
  896    !.
  897read_incomplete(In, Left) :-
  898    % Left < 1 000 000,			% Optionally close anyway.
  899    catch(setup_call_cleanup(
  900              open_null_stream(Null),
  901              copy_stream_data(In, Null, Left),
  902              close(Null)),
  903          error(_,_),
  904          fail).
 http_requeue(+Header)
Re-queue a connection to the worker pool. This deals with processing additional requests on keep-alive connections.
  911http_requeue(Header) :-
  912    requeue_header(Header, ClientOptions),
  913    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  914    memberchk(peer(Peer), ClientOptions),
  915    http_enough_workers(Queue, keep_alive, Peer),
  916    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  917    !.
  918http_requeue(Header) :-
  919    debug(http(error), 'Re-queue failed: ~p', [Header]),
  920    fail.
  921
  922requeue_header([], []).
  923requeue_header([H|T0], [H|T]) :-
  924    requeue_keep(H),
  925    !,
  926    requeue_header(T0, T).
  927requeue_header([_|T0], T) :-
  928    requeue_header(T0, T).
  929
  930requeue_keep(pool(_)).
  931requeue_keep(peer(_)).
  932requeue_keep(protocol(_)).
 http_process(Message, Queue, +Options)
Handle a single client message on the given stream.
  939http_process(Goal, In, Out, Options) :-
  940    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  941          [Goal, In, Out]),
  942    option(timeout(TMO), Options, 60),
  943    set_stream(In, timeout(TMO)),
  944    set_stream(Out, timeout(TMO)),
  945    http_wrapper(Goal, In, Out, Connection,
  946                 [ request(Request),
  947                   byte_count(StartBody)
  948                 | Options
  949                 ]),
  950    next(Connection, StartBody, Request).
  951
  952next(Connection, StartBody, Request) :-
  953    next_(Connection, StartBody, Request), !.
  954next(Connection, StartBody, Request) :-
  955    print_message(warning, goal_failed(next(Connection,StartBody,Request))).
  956
  957next_(switch_protocol(SwitchGoal, _SwitchOptions), _, Request) :-
  958    !,
  959    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  960    (   catch(call(SwitchGoal, In, Out), E,
  961              (   print_message(error, E),
  962                  fail))
  963    ->  true
  964    ;   http_close_connection(Request)
  965    ).
  966next_(spawned(ThreadId), _, _) :-
  967    !,
  968    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  969next_(Connection, StartBody, Request) :-
  970    downcase_atom(Connection, 'keep-alive'),
  971    read_remaining_request(StartBody, Request),
  972    http_requeue(Request),
  973    !.
  974next_(_, _, Request) :-
  975    http_close_connection(Request).
 http_close_connection(+Request)
Close connection associated to Request. See also http_requeue/1.
  982http_close_connection(Request) :-
  983    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  984    memberchk(peer(Peer), Request),
  985    close_connection(Peer, In, Out).
 close_connection(+Peer, +In, +Out)
Closes the connection from the server to the client. Errors are currently silently ignored.
  992close_connection(Peer, In, Out) :-
  993    debug(http(connection), 'Closing connection from ~p', [Peer]),
  994    catch(close(In, [force(true)]), _, true),
  995    catch(close(Out, [force(true)]), _, true).
 http_spawn(:Goal, +Options) is det
Continue this connection on a new thread. A handler may call http_spawn/2 to start a new thread that continues processing the current request using Goal. The original thread returns to the worker pool for processing new requests. Options are passed to thread_create/3, except for:
pool(+Pool)
Interfaces to library(thread_pool), starting the thread on the given pool.

If a pool does not exist, this predicate calls the multifile hook create_pool/1 to create it. If this predicate succeeds the operation is retried.

 1013http_spawn(Goal, Options) :-
 1014    select_option(pool(Pool), Options, ThreadOptions),
 1015    !,
 1016    current_output(CGI),
 1017    Error = error(Formal, _),
 1018    catch(thread_create_in_pool(Pool,
 1019                                wrap_spawned(CGI, Goal), Id,
 1020                                [ detached(true),
 1021                                  class(http)
 1022                                | ThreadOptions
 1023                                ]),
 1024          Error,
 1025          true),
 1026    (   var(Formal)
 1027    ->  http_spawned(Id)
 1028    ;   Formal = resource_error(threads_in_pool(_))
 1029    ->  throw(http_reply(busy))
 1030    ;   Formal = existence_error(thread_pool, Pool),
 1031        create_pool(Pool)
 1032    ->  http_spawn(Goal, Options)
 1033    ;   throw(Error)
 1034    ).
 1035http_spawn(Goal, Options) :-
 1036    current_output(CGI),
 1037    thread_create(wrap_spawned(CGI, Goal), Id,
 1038                  [ detached(true),
 1039                    class(http)
 1040                  | Options
 1041                  ]),
 1042    http_spawned(Id).
 1043
 1044wrap_spawned(CGI, Goal) :-
 1045    set_output(CGI),
 1046    cgi_property(CGI, request(Request)),
 1047    memberchk(input(Input), Request),
 1048    byte_count(Input, StartBody),
 1049    http_wrap_spawned(Goal, Request, Connection),
 1050    next(Connection, StartBody, Request).
 create_pool(+Pool)
Lazy creation of worker-pools for the HTTP server. This predicate calls the hook create_pool/1. If the hook fails it creates a default pool of size 10. This should suffice most typical usecases. Note that we get a permission error if the pool is already created. We can ignore this.
 1060create_pool(Pool) :-
 1061    E = error(permission_error(create, thread_pool, Pool), _),
 1062    catch(http:create_pool(Pool), E, true).
 1063create_pool(Pool) :-
 1064    print_message(informational, httpd(created_pool(Pool))),
 1065    thread_pool_create(Pool, 10, []).
 1066
 1067
 1068		 /*******************************
 1069		 *         WAIT POLICIES	*
 1070		 *******************************/
 1071
 1072:- meta_predicate
 1073    thread_repeat_wait(0).
 thread_repeat_wait(:Goal) is multi
Acts as repeat, thread_idle(Goal), choosing whether to use a long or short idle time based on the average firing rate.
 1080thread_repeat_wait(Goal) :-
 1081    new_rate_mma(5, 1000, State),
 1082    repeat,
 1083      notrace,
 1084      update_rate_mma(State, MMA),
 1085      long(MMA, IsLong),
 1086      (   IsLong == brief
 1087      ->  call(Goal)
 1088      ;   thread_idle(Goal, IsLong)
 1089      ).
 1090
 1091long(MMA, brief) :-
 1092    MMA < 0.05,
 1093    !.
 1094long(MMA, short) :-
 1095    MMA < 1,
 1096    !.
 1097long(_, long).
 new_rate_mma(+N, +Resolution, -State) is det
 update_rate_mma(!State, -MMA) is det
Implement Modified Moving Average computing the average time between requests as an exponential moving average with alpha=1/N.
Arguments:
Resolution- is the time resolution in 1/Resolution seconds. All storage is done in integers to avoid the need for stack freezing in nb_setarg/3.
See also
- https://en.wikipedia.org/wiki/Moving_average
 1111new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
 1112    current_prolog_flag(max_tagged_integer, MaxI),
 1113    get_time(Base).
 1114
 1115update_rate_mma(State, MMAr) :-
 1116    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
 1117    get_time(Now),
 1118    Stamp is round((Now-Base)*Resolution),
 1119    (   Stamp > MaxI
 1120    ->  nb_setarg(1, State, Now),
 1121        nb_setarg(2, State, 0)
 1122    ;   true
 1123    ),
 1124    Diff is Stamp-Last,
 1125    nb_setarg(2, State, Stamp),
 1126    MMA is round(((N-1)*MMA0+Diff)/N),
 1127    nb_setarg(6, State, MMA),
 1128    MMAr is MMA/float(Resolution).
 1129
 1130
 1131                 /*******************************
 1132                 *            MESSAGES          *
 1133                 *******************************/
 1134
 1135:- multifile
 1136    prolog:message/3. 1137
 1138prolog:message(httpd_started_server(Port, Options)) -->
 1139    [ 'Started server at '-[] ],
 1140    http_root(Port, Options).
 1141prolog:message(httpd_stopped_worker(Self, Status)) -->
 1142    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1143prolog:message(httpd_restarted_worker(Self)) -->
 1144    [ 'Replaced aborted worker ~p'-[Self] ].
 1145prolog:message(httpd(created_pool(Pool))) -->
 1146    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1147      'Create this pool at startup-time or define the hook ', nl,
 1148      'http:create_pool/1 to avoid this message and create a ', nl,
 1149      'pool that fits the usage-profile.'
 1150    ].
 1151
 1152http_root(Address, Options) -->
 1153    { landing_page(Address, URI, Options) },
 1154    [ url(URI) ].
 1155
 1156landing_page(Host:Port, URI, Options) :-
 1157    !,
 1158    must_be(atom, Host),
 1159    must_be(integer, Port),
 1160    http_server_property(Port, scheme(Scheme)),
 1161    (   default_port(Scheme, Port)
 1162    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1163    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1164    ),
 1165    entry_page(Base, URI, Options).
 1166landing_page(unix_socket(Path), URI, _Options) :-
 1167    !,
 1168    format(string(URI), 'Unix domain socket "~w"', [Path]).
 1169landing_page(Port, URI, Options) :-
 1170    landing_page(localhost:Port, URI, Options).
 1171
 1172default_port(http, 80).
 1173default_port(https, 443).
 1174
 1175entry_page(Base, URI, Options) :-
 1176    option(entry_page(Entry), Options),
 1177    !,
 1178    uri_resolve(Entry, Base, URI).
 1179entry_page(Base, URI, _) :-
 1180    http_absolute_location(root(.), Entry, []),
 1181    uri_resolve(Entry, Base, URI)