View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2022, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread_httpd,
   38          [ http_current_server/2,      % ?:Goal, ?Port
   39            http_server_property/2,     % ?Port, ?Property
   40            http_server/2,              % :Goal, +Options
   41            http_workers/2,             % +Port, ?WorkerCount
   42            http_add_worker/2,          % +Port, +Options
   43            http_current_worker/2,      % ?Port, ?ThreadID
   44            http_stop_server/2,         % +Port, +Options
   45            http_spawn/2,               % :Goal, +Options
   46
   47            http_requeue/1,             % +Request
   48            http_close_connection/1,    % +Request
   49            http_enough_workers/3       % +Queue, +Why, +Peer
   50          ]).   51:- use_module(library(debug)).   52:- use_module(library(error)).   53:- use_module(library(option)).   54:- use_module(library(socket)).   55:- use_module(library(thread_pool)).   56:- use_module(library(gensym)).   57:- use_module(http_wrapper).   58:- use_module(http_path).   59
   60:- autoload(library(uri), [uri_resolve/3]).   61
   62:- predicate_options(http_server/2, 2,
   63                     [ port(any),
   64                       unix_socket(atom),
   65                       entry_page(atom),
   66                       tcp_socket(any),
   67                       workers(positive_integer),
   68                       timeout(number),
   69                       keep_alive_timeout(number),
   70                       silent(boolean),
   71                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   72                       pass_to(system:thread_create/3, 3)
   73                     ]).   74:- predicate_options(http_spawn/2, 2,
   75                     [ pool(atom),
   76                       pass_to(system:thread_create/3, 3),
   77                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   78                     ]).   79:- predicate_options(http_add_worker/2, 2,
   80                     [ timeout(number),
   81                       keep_alive_timeout(number),
   82                       max_idle_time(number),
   83                       pass_to(system:thread_create/3, 3)
   84                     ]).

Threaded HTTP server

Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.

This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.

On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.

Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */

  112:- meta_predicate
  113    http_server(1, :),
  114    http_current_server(1, ?),
  115    http_spawn(0, +).  116
  117:- dynamic
  118    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  119    queue_worker/2,         % Queue, ThreadID
  120    queue_options/2.        % Queue, Options
  121
  122:- multifile
  123    make_socket_hook/3,
  124    accept_hook/2,
  125    close_hook/1,
  126    open_client_hook/6,
  127    discard_client_hook/1,
  128    http:create_pool/1,
  129    http:schedule_workers/1.  130
  131:- meta_predicate
  132    thread_repeat_wait(0).
 http_server(:Goal, :Options) is det
Create a server at Port that calls Goal for each parsed request. Options provide a list of options. Defined options are
port(?Address)
Port to bind to. Address is either a port or a term Host:Port. The port may be a variable, causing the system to select a free port. See tcp_bind/2.
unix_socket(+Path)
Instead of binding to a TCP port, bind to a Unix Domain Socket at Path.
entry_page(+URI)
Affects the message printed while the server is started. Interpreted as a URI relative to the server root.
tcp_socket(+Socket)
If provided, use this socket instead of the creating one and binding it to an address. The socket must be bound to an address.
workers(+Count)
Determine the number of worker threads. Default is 5. This is fine for small scale usage. Public servers typically need a higher number.
timeout(+Seconds)
Maximum time of inactivity trying to read the request after a connection has been opened. Default is 60 seconds. See set_stream/1 using the timeout option.
keep_alive_timeout(+Seconds)
Time to keep `Keep alive' connections alive. Default is 2 seconds.
stack_limit(+Bytes)
Stack limit to use for the workers. The default is inherited from the main thread. If you need to control resource usage you may consider the spawn option of http_handler/3 and library(thread_pool).
silent(Bool)
If true (default false), do not print an informational message that the server was started.

A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:

:- use_module(library(http/thread_httpd)).
:- use_module(library(http/http_dispatch)).

start_server(Port) :-
    http_server(http_dispatch, [port(Port)]).

Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.

  197http_server(Goal, M:Options0) :-
  198    server_address(Address, Options0),
  199    !,
  200    make_socket(Address, M:Options0, Options),
  201    create_workers(Options),
  202    create_server(Goal, Address, Options),
  203    (   option(silent(true), Options0)
  204    ->  true
  205    ;   print_message(informational,
  206                      httpd_started_server(Address, Options0))
  207    ).
  208http_server(_Goal, _:Options0) :-
  209    existence_error(server_address, Options0).
  210
  211server_address(Address, Options) :-
  212    (   option(port(Port), Options)
  213    ->  Address = Port
  214    ;   option(unix_socket(Path), Options)
  215    ->  Address = unix_socket(Path)
  216    ).
  217
  218address_port(_IFace:Port, Port) :- !.
  219address_port(unix_socket(Path), Path) :- !.
  220address_port(Address, Address) :- !.
  221
  222tcp_address(Port) :-
  223    var(Port),
  224    !.
  225tcp_address(Port) :-
  226    integer(Port),
  227    !.
  228tcp_address(_Iface:_Port).
  229
  230address_domain(localhost:_Port, Domain) =>
  231    Domain = inet.
  232address_domain(Iface:_Port, Domain) =>
  233    (   catch(ip_name(IP, Iface), error(_,_), fail),
  234        functor(IP, ip, 8)
  235    ->  Domain = inet6
  236    ;   Domain = inet
  237    ).
  238address_domain(_, Domain) =>
  239    Domain = inet.
 make_socket(+Address, :OptionsIn, -OptionsOut) is det
Create the HTTP server socket and worker pool queue. OptionsOut is quaranteed to hold the option queue(QueueId).
Arguments:
OptionsIn- is qualified to allow passing the module-sensitive ssl option argument.
  250make_socket(Address, M:Options0, Options) :-
  251    tcp_address(Address),
  252    make_socket_hook(Address, M:Options0, Options),
  253    !.
  254make_socket(Address, _:Options0, Options) :-
  255    option(tcp_socket(_), Options0),
  256    !,
  257    make_addr_atom('httpd', Address, Queue),
  258    Options = [ queue(Queue)
  259              | Options0
  260              ].
  261make_socket(Address, _:Options0, Options) :-
  262    tcp_address(Address),
  263    !,
  264    address_domain(Address, Domain),
  265    socket_create(Socket, [domain(Domain)]),
  266    tcp_setopt(Socket, reuseaddr),
  267    tcp_bind(Socket, Address),
  268    tcp_listen(Socket, 64),
  269    make_addr_atom('httpd', Address, Queue),
  270    Options = [ queue(Queue),
  271                tcp_socket(Socket)
  272              | Options0
  273              ].
  274:- if(current_predicate(unix_domain_socket/1)).  275make_socket(Address, _:Options0, Options) :-
  276    Address = unix_socket(Path),
  277    !,
  278    unix_domain_socket(Socket),
  279    tcp_bind(Socket, Path),
  280    tcp_listen(Socket, 64),
  281    make_addr_atom('httpd', Address, Queue),
  282    Options = [ queue(Queue),
  283                tcp_socket(Socket)
  284              | Options0
  285              ].
  286:- endif.
 make_addr_atom(+Scheme, +Address, -Atom) is det
Create an atom that identifies the server's queue and thread resources.
  293make_addr_atom(Scheme, Address, Atom) :-
  294    phrase(address_parts(Address), Parts),
  295    atomic_list_concat([Scheme,@|Parts], Atom).
  296
  297address_parts(Var) -->
  298    { var(Var),
  299      !,
  300      instantiation_error(Var)
  301    }.
  302address_parts(Atomic) -->
  303    { atomic(Atomic) },
  304    !,
  305    [Atomic].
  306address_parts(Host:Port) -->
  307    !,
  308    address_parts(Host), [:], address_parts(Port).
  309address_parts(ip(A,B,C,D)) -->
  310    !,
  311    [ A, '.', B, '.', C, '.', D ].
  312address_parts(unix_socket(Path)) -->
  313    [Path].
  314address_parts(Address) -->
  315    { domain_error(http_server_address, Address) }.
 create_server(:Goal, +Address, +Options) is det
Create the main server thread that runs accept_server/2 to listen to new requests.
  323create_server(Goal, Address, Options) :-
  324    get_time(StartTime),
  325    memberchk(queue(Queue), Options),
  326    scheme(Scheme, Options),
  327    autoload_https(Scheme),
  328    address_port(Address, Port),
  329    make_addr_atom(Scheme, Port, Alias),
  330    thread_self(Initiator),
  331    thread_create(accept_server(Goal, Initiator, Options), _,
  332                  [ alias(Alias)
  333                  ]),
  334    thread_get_message(server_started),
  335    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  336
  337scheme(Scheme, Options) :-
  338    option(scheme(Scheme), Options),
  339    !.
  340scheme(Scheme, Options) :-
  341    (   option(ssl(_), Options)
  342    ;   option(ssl_instance(_), Options)
  343    ),
  344    !,
  345    Scheme = https.
  346scheme(http, _).
  347
  348autoload_https(https) :-
  349    \+ clause(accept_hook(_Goal, _Options), _),
  350    exists_source(library(http/http_ssl_plugin)),
  351    !,
  352    use_module(library(http/http_ssl_plugin)).
  353autoload_https(_).
 http_current_server(:Goal, ?Port) is nondet
True if Goal is the goal of a server at Port.
deprecated
- Use http_server_property(Port, goal(Goal))
  361http_current_server(Goal, Port) :-
  362    current_server(Port, Goal, _, _, _, _).
 http_server_property(?Port, ?Property) is nondet
True if Property is a property of the HTTP server running at Port. Defined properties are:
goal(:Goal)
Goal used to start the server. This is often http_dispatch/1.
scheme(-Scheme)
Scheme is one of http or https.
start_time(?Time)
Time-stamp when the server was created.
  378http_server_property(_:Port, Property) :-
  379    integer(Port),
  380    !,
  381    server_property(Property, Port).
  382http_server_property(Port, Property) :-
  383    server_property(Property, Port).
  384
  385server_property(goal(Goal), Port) :-
  386    current_server(Port, Goal, _, _, _, _).
  387server_property(scheme(Scheme), Port) :-
  388    current_server(Port, _, _, _, Scheme, _).
  389server_property(start_time(Time), Port) :-
  390    current_server(Port, _, _, _, _, Time).
 http_workers(+Port, -Workers) is det
http_workers(+Port, +Workers:int) is det
Query or set the number of workers for the server at this port. The number of workers is dynamically modified. Setting it to 1 (one) can be used to profile the worker using tprofile/1.
  400http_workers(Port, Workers) :-
  401    must_be(ground, Port),
  402    current_server(Port, _, _, Queue, _, _),
  403    !,
  404    (   integer(Workers)
  405    ->  resize_pool(Queue, Workers)
  406    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  407        length(WorkerIDs, Workers)
  408    ).
  409http_workers(Port, _) :-
  410    existence_error(http_server, Port).
 http_add_worker(+Port, +Options) is det
Add a new worker to the HTTP server for port Port. Options overrule the default queue options. The following additional options are processed:
max_idle_time(+Seconds)
The created worker will automatically terminate if there is no new work within Seconds.
  423http_add_worker(Port, Options) :-
  424    must_be(ground, Port),
  425    current_server(Port, _, _, Queue, _, _),
  426    !,
  427    queue_options(Queue, QueueOptions),
  428    merge_options(Options, QueueOptions, WorkerOptions),
  429    atom_concat(Queue, '_', AliasBase),
  430    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  431http_add_worker(Port, _) :-
  432    existence_error(http_server, Port).
 http_current_worker(?Port, ?ThreadID) is nondet
True if ThreadID is the identifier of a Prolog thread serving Port. This predicate is motivated to allow for the use of arbitrary interaction with the worker thread for development and statistics.
  442http_current_worker(Port, ThreadID) :-
  443    current_server(Port, _, _, Queue, _, _),
  444    queue_worker(Queue, ThreadID).
 accept_server(:Goal, +Initiator, +Options)
The goal of a small server-thread accepting new requests and posting them to the queue of workers.
  452accept_server(Goal, Initiator, Options) :-
  453    catch(accept_server2(Goal, Initiator, Options), http_stop, true),
  454    thread_self(Thread),
  455    debug(http(stop), '[~p]: accept server received http_stop', [Thread]),
  456    retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)),
  457    close_pending_accepts(Queue),
  458    close_server_socket(Options).
  459
  460accept_server2(Goal, Initiator, Options) :-
  461    thread_send_message(Initiator, server_started),
  462    repeat,
  463      (   catch(accept_server3(Goal, Options), E, true)
  464      ->  (   var(E)
  465          ->  fail
  466          ;   accept_rethrow_error(E)
  467          ->  throw(E)
  468          ;   print_message(error, E),
  469              fail
  470          )
  471      ;   print_message(error,      % internal error
  472                        goal_failed(accept_server3(Goal, Options))),
  473          fail
  474      ).
  475
  476accept_server3(Goal, Options) :-
  477    accept_hook(Goal, Options),
  478    !.
  479accept_server3(Goal, Options) :-
  480    memberchk(tcp_socket(Socket), Options),
  481    memberchk(queue(Queue), Options),
  482    debug(http(connection), 'Waiting for connection', []),
  483    tcp_accept(Socket, Client, Peer),
  484    sig_atomic(send_to_worker(Queue, Client, Goal, Peer)),
  485    http_enough_workers(Queue, accept, Peer).
  486
  487send_to_worker(Queue, Client, Goal, Peer) :-
  488    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  489    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  490
  491accept_rethrow_error(http_stop).
  492accept_rethrow_error('$aborted').
 close_server_socket(+Options)
Close the server socket.
  499close_server_socket(Options) :-
  500    close_hook(Options),
  501    !.
  502close_server_socket(Options) :-
  503    memberchk(tcp_socket(Socket), Options),
  504    !,
  505    tcp_close_socket(Socket).
 close_pending_accepts(+Queue)
  509close_pending_accepts(Queue) :-
  510    (   thread_get_message(Queue, Msg, [timeout(0)])
  511    ->  close_client(Msg),
  512        close_pending_accepts(Queue)
  513    ;   true
  514    ).
  515
  516close_client(tcp_client(Client, _Goal, _0Peer)) =>
  517    debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]),
  518    tcp_close_socket(Client).
  519close_client(Msg) =>
  520    (   discard_client_hook(Msg)
  521    ->  true
  522    ;   print_message(warning, http_close_client(Msg))
  523    ).
 http_stop_server(+Port, +Options)
Stop the indicated HTTP server gracefully. First stops all workers, then stops the server.
To be done
- Realise non-graceful stop
  533http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  534    ground(Host),
  535    !,
  536    http_stop_server(Port, Options).
  537http_stop_server(Port, _Options) :-
  538    http_workers(Port, 0),                  % checks Port is ground
  539    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  540    retractall(queue_options(Queue, _)),
  541    debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]),
  542    thread_signal(Thread, throw(http_stop)),
  543    catch(connect(localhost:Port), _, true),
  544    thread_join(Thread, _0Status),
  545    debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]),
  546    message_queue_destroy(Queue).
  547
  548connect(Address) :-
  549    setup_call_cleanup(
  550        tcp_socket(Socket),
  551        tcp_connect(Socket, Address),
  552        tcp_close_socket(Socket)).
 http_enough_workers(+Queue, +Why, +Peer) is det
Check that we have enough workers in our queue. If not, call the hook http:schedule_workers/1 to extend the worker pool. This predicate can be used by accept_hook/2.
  560http_enough_workers(Queue, _Why, _Peer) :-
  561    message_queue_property(Queue, waiting(_0)),
  562    !,
  563    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  564http_enough_workers(Queue, Why, Peer) :-
  565    message_queue_property(Queue, size(Size)),
  566    (   enough(Size, Why)
  567    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  568    ;   current_server(Port, _, _, Queue, _, _),
  569        Data = _{ port:Port,
  570                  reason:Why,
  571                  peer:Peer,
  572                  waiting:Size,
  573                  queue:Queue
  574                },
  575        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  576        catch(http:schedule_workers(Data),
  577              Error,
  578              print_message(error, Error))
  579    ->  true
  580    ;   true
  581    ).
  582
  583enough(0, _).
  584enough(1, keep_alive).                  % I will be ready myself
 http:schedule_workers(+Data:dict) is semidet
Hook called if a new connection or a keep-alive connection cannot be scheduled immediately to a worker. Dict contains the following keys:
port:Port
Port number that identifies the server.
reason:Reason
One of accept for a new connection or keep_alive if a worker tries to reschedule itself.
peer:Peer
Identify the other end of the connection
waiting:Size
Number of messages waiting in the queue.
queue:Queue
Message queue used to dispatch accepted messages.

Note that, when called with reason:accept, we are called in the time critical main accept loop. An implementation of this hook shall typically send the event to thread dedicated to dynamic worker-pool management.

See also
- http_add_worker/2 may be used to create (temporary) extra workers.
  614                 /*******************************
  615                 *    WORKER QUEUE OPERATIONS   *
  616                 *******************************/
 create_workers(+Options)
Create the pool of HTTP worker-threads. Each worker has the alias http_worker_N.
  623create_workers(Options) :-
  624    option(workers(N), Options, 5),
  625    option(queue(Queue), Options),
  626    catch(message_queue_create(Queue), _, true),
  627    atom_concat(Queue, '_', AliasBase),
  628    create_workers(1, N, Queue, AliasBase, Options),
  629    assert(queue_options(Queue, Options)).
  630
  631create_workers(I, N, _, _, _) :-
  632    I > N,
  633    !.
  634create_workers(I, N, Queue, AliasBase, Options) :-
  635    gensym(AliasBase, Alias),
  636    thread_create(http_worker(Options), Id,
  637                  [ alias(Alias)
  638                  | Options
  639                  ]),
  640    assertz(queue_worker(Queue, Id)),
  641    I2 is I + 1,
  642    create_workers(I2, N, Queue, AliasBase, Options).
 resize_pool(+Queue, +Workers) is det
Create or destroy workers. If workers are destroyed, the call waits until the desired number of waiters is reached.
  650resize_pool(Queue, Size) :-
  651    findall(W, queue_worker(Queue, W), Workers),
  652    length(Workers, Now),
  653    (   Now < Size
  654    ->  queue_options(Queue, Options),
  655        atom_concat(Queue, '_', AliasBase),
  656        I0 is Now+1,
  657        create_workers(I0, Size, Queue, AliasBase, Options)
  658    ;   Now == Size
  659    ->  true
  660    ;   Now > Size
  661    ->  Excess is Now - Size,
  662        thread_self(Me),
  663        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  664        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  665    ).
 http_worker(+Options)
Run HTTP worker main loop. Workers simply wait until they are passed an accepted socket to process a client.

If the message quit(Sender) is read from the queue, the worker stops.

  676http_worker(Options) :-
  677    debug(http(scheduler), 'New worker', []),
  678    prolog_listen(this_thread_exit, done_worker),
  679    option(queue(Queue), Options),
  680    option(max_idle_time(MaxIdle), Options, infinite),
  681    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  682      debug(http(worker), 'Waiting for a job ...', []),
  683      debug(http(worker), 'Got job ~p', [Message]),
  684      (   Message = quit(Sender)
  685      ->  !,
  686          thread_self(Self),
  687          thread_detach(Self),
  688          (   Sender == idle
  689          ->  true
  690          ;   retract(queue_worker(Queue, Self)),
  691              thread_send_message(Sender, quitted(Self))
  692          )
  693      ;   open_client(Message, Queue, Goal, In, Out,
  694                      Options, ClientOptions),
  695          (   catch(http_process(Goal, In, Out, ClientOptions),
  696                    Error, true)
  697          ->  true
  698          ;   Error = goal_failed(http_process/4)
  699          ),
  700          (   var(Error)
  701          ->  fail
  702          ;   current_message_level(Error, Level),
  703              print_message(Level, Error),
  704              memberchk(peer(Peer), ClientOptions),
  705              close_connection(Peer, In, Out),
  706              fail
  707          )
  708      ).
  709
  710get_work(Queue, Message, infinite) :-
  711    !,
  712    thread_get_message(Queue, Message).
  713get_work(Queue, Message, MaxIdle) :-
  714    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  715    ->  true
  716    ;   Message = quit(idle)
  717    ).
 open_client(+Message, +Queue, -Goal, -In, -Out, +Options, -ClientOptions) is semidet
Opens the connection to the client in a worker from the message sent to the queue by accept_server/2.
  726open_client(requeue(In, Out, Goal, ClOpts),
  727            _, Goal, In, Out, Opts, ClOpts) :-
  728    !,
  729    memberchk(peer(Peer), ClOpts),
  730    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  731    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  732open_client(Message, Queue, Goal, In, Out, Opts,
  733            [ pool(client(Queue, Goal, In, Out)),
  734              timeout(Timeout)
  735            | Options
  736            ]) :-
  737    catch(open_client(Message, Goal, In, Out, Options, Opts),
  738          E, report_error(E)),
  739    option(timeout(Timeout), Opts, 60),
  740    (   debugging(http(connection))
  741    ->  memberchk(peer(Peer), Options),
  742        debug(http(connection), 'Opened connection from ~p', [Peer])
  743    ;   true
  744    ).
 open_client(+Message, +Goal, -In, -Out, -ClientOptions, +Options) is det
  750open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  751    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  752    !.
  753open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  754            [ peer(Peer),
  755              protocol(http)
  756            ], _) :-
  757    tcp_open_socket(Socket, In, Out).
  758
  759report_error(E) :-
  760    print_message(error, E),
  761    fail.
 check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet
Wait for the client for at most TimeOut seconds. Succeed if the client starts a new request within this time. Otherwise close the connection and fail.
  770check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  771    stream_property(In, timeout(Old)),
  772    set_stream(In, timeout(TMO)),
  773    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  774    catch(peek_code(In, Code), E, true),
  775    (   var(E),                     % no exception
  776        Code \== -1                 % no end-of-file
  777    ->  set_stream(In, timeout(Old)),
  778        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  779    ;   (   Code == -1
  780        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  781        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  782        ),
  783        close_connection(Peer, In, Out),
  784        fail
  785    ).
 done_worker
Called when worker is terminated due to http_workers/2 or a (debugging) exception. In the latter case, recreate_worker/2 creates a new worker.
  794done_worker :-
  795    thread_self(Self),
  796    thread_detach(Self),
  797    retract(queue_worker(Queue, Self)),
  798    thread_property(Self, status(Status)),
  799    !,
  800    (   catch(recreate_worker(Status, Queue), _, fail)
  801    ->  print_message(informational,
  802                      httpd_restarted_worker(Self))
  803    ;   done_status_message_level(Status, Level),
  804        print_message(Level,
  805                      httpd_stopped_worker(Self, Status))
  806    ).
  807done_worker :-                                  % received quit(Sender)
  808    thread_self(Self),
  809    thread_property(Self, status(Status)),
  810    done_status_message_level(Status, Level),
  811    print_message(Level,
  812                  httpd_stopped_worker(Self, Status)).
  813
  814done_status_message_level(true, silent) :- !.
  815done_status_message_level(exception('$aborted'), silent) :- !.
  816done_status_message_level(_, informational).
 recreate_worker(+Status, +Queue) is semidet
Deal with the possibility that threads are, during development, killed with abort/0. We recreate the worker to avoid that eventually we run out of workers. If we are aborted due to a halt/0 call, thread_create/3 will raise a permission error.

The first clause deals with the possibility that we cannot write to user_error. This is possible when Prolog is started as a service using some service managers. Would be nice if we could write an error, but where?

  831recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  832    halt(2).
  833recreate_worker(exception(Error), Queue) :-
  834    recreate_on_error(Error),
  835    queue_options(Queue, Options),
  836    atom_concat(Queue, '_', AliasBase),
  837    create_workers(1, 1, Queue, AliasBase, Options).
  838
  839recreate_on_error('$aborted').
  840recreate_on_error(time_limit_exceeded).
 thread_httpd:message_level(+Exception, -Level)
Determine the message stream used for exceptions that may occur during server_loop/5. Being multifile, clauses can be added by the application to refine error handling. See also message_hook/3 for further programming error handling.
  849:- multifile
  850    message_level/2.  851
  852message_level(error(io_error(read, _), _),               silent).
  853message_level(error(socket_error(epipe,_), _),           silent).
  854message_level(error(http_write_short(_Obj,_Written), _), silent).
  855message_level(error(timeout_error(read, _), _),          informational).
  856message_level(keep_alive_timeout,                        silent).
  857
  858current_message_level(Term, Level) :-
  859    (   message_level(Term, Level)
  860    ->  true
  861    ;   Level = error
  862    ).
 http_requeue(+Header)
Re-queue a connection to the worker pool. This deals with processing additional requests on keep-alive connections.
  870http_requeue(Header) :-
  871    requeue_header(Header, ClientOptions),
  872    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  873    memberchk(peer(Peer), ClientOptions),
  874    http_enough_workers(Queue, keep_alive, Peer),
  875    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  876    !.
  877http_requeue(Header) :-
  878    debug(http(error), 'Re-queue failed: ~p', [Header]),
  879    fail.
  880
  881requeue_header([], []).
  882requeue_header([H|T0], [H|T]) :-
  883    requeue_keep(H),
  884    !,
  885    requeue_header(T0, T).
  886requeue_header([_|T0], T) :-
  887    requeue_header(T0, T).
  888
  889requeue_keep(pool(_)).
  890requeue_keep(peer(_)).
  891requeue_keep(protocol(_)).
 http_process(Message, Queue, +Options)
Handle a single client message on the given stream.
  898http_process(Goal, In, Out, Options) :-
  899    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  900          [Goal, In, Out]),
  901    option(timeout(TMO), Options, 60),
  902    set_stream(In, timeout(TMO)),
  903    set_stream(Out, timeout(TMO)),
  904    http_wrapper(Goal, In, Out, Connection,
  905                 [ request(Request)
  906                 | Options
  907                 ]),
  908    next(Connection, Request).
  909
  910next(Connection, Request) :-
  911    next_(Connection, Request), !.
  912next(Connection, Request) :-
  913    print_message(warning, goal_failed(next(Connection,Request))).
  914
  915next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  916    !,
  917    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  918    (   catch(call(SwitchGoal, In, Out), E,
  919              (   print_message(error, E),
  920                  fail))
  921    ->  true
  922    ;   http_close_connection(Request)
  923    ).
  924next_(spawned(ThreadId), _) :-
  925    !,
  926    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  927next_(Connection, Request) :-
  928    downcase_atom(Connection, 'keep-alive'),
  929    http_requeue(Request),
  930    !.
  931next_(_, Request) :-
  932    http_close_connection(Request).
 http_close_connection(+Request)
Close connection associated to Request. See also http_requeue/1.
  939http_close_connection(Request) :-
  940    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  941    memberchk(peer(Peer), Request),
  942    close_connection(Peer, In, Out).
 close_connection(+Peer, +In, +Out)
Closes the connection from the server to the client. Errors are currently silently ignored.
  949close_connection(Peer, In, Out) :-
  950    debug(http(connection), 'Closing connection from ~p', [Peer]),
  951    catch(close(In, [force(true)]), _, true),
  952    catch(close(Out, [force(true)]), _, true).
 http_spawn(:Goal, +Options) is det
Continue this connection on a new thread. A handler may call http_spawn/2 to start a new thread that continues processing the current request using Goal. The original thread returns to the worker pool for processing new requests. Options are passed to thread_create/3, except for:
pool(+Pool)
Interfaces to library(thread_pool), starting the thread on the given pool.

If a pool does not exist, this predicate calls the multifile hook create_pool/1 to create it. If this predicate succeeds the operation is retried.

  970http_spawn(Goal, Options) :-
  971    select_option(pool(Pool), Options, ThreadOptions),
  972    !,
  973    current_output(CGI),
  974    catch(thread_create_in_pool(Pool,
  975                                wrap_spawned(CGI, Goal), Id,
  976                                [ detached(true)
  977                                | ThreadOptions
  978                                ]),
  979          Error,
  980          true),
  981    (   var(Error)
  982    ->  http_spawned(Id)
  983    ;   Error = error(resource_error(threads_in_pool(_)), _)
  984    ->  throw(http_reply(busy))
  985    ;   Error = error(existence_error(thread_pool, Pool), _),
  986        create_pool(Pool)
  987    ->  http_spawn(Goal, Options)
  988    ;   throw(Error)
  989    ).
  990http_spawn(Goal, Options) :-
  991    current_output(CGI),
  992    thread_create(wrap_spawned(CGI, Goal), Id,
  993                  [ detached(true)
  994                  | Options
  995                  ]),
  996    http_spawned(Id).
  997
  998wrap_spawned(CGI, Goal) :-
  999    set_output(CGI),
 1000    http_wrap_spawned(Goal, Request, Connection),
 1001    next(Connection, Request).
 create_pool(+Pool)
Lazy creation of worker-pools for the HTTP server. This predicate calls the hook create_pool/1. If the hook fails it creates a default pool of size 10. This should suffice most typical usecases. Note that we get a permission error if the pool is already created. We can ignore this.
 1011create_pool(Pool) :-
 1012    E = error(permission_error(create, thread_pool, Pool), _),
 1013    catch(http:create_pool(Pool), E, true).
 1014create_pool(Pool) :-
 1015    print_message(informational, httpd(created_pool(Pool))),
 1016    thread_pool_create(Pool, 10, []).
 1017
 1018
 1019		 /*******************************
 1020		 *         WAIT POLICIES	*
 1021		 *******************************/
 1022
 1023:- meta_predicate
 1024    thread_repeat_wait(0).
 thread_repeat_wait(:Goal) is multi
Acts as repeat, thread_idle(Goal), choosing whether to use a long or short idle time based on the average firing rate.
 1031thread_repeat_wait(Goal) :-
 1032    new_rate_mma(5, 1000, State),
 1033    repeat,
 1034      update_rate_mma(State, MMA),
 1035      long(MMA, IsLong),
 1036      (   IsLong == brief
 1037      ->  call(Goal)
 1038      ;   thread_idle(Goal, IsLong)
 1039      ).
 1040
 1041long(MMA, brief) :-
 1042    MMA < 0.05,
 1043    !.
 1044long(MMA, short) :-
 1045    MMA < 1,
 1046    !.
 1047long(_, long).
 new_rate_mma(+N, +Resolution, -State) is det
 update_rate_mma(!State, -MMA) is det
Implement Modified Moving Average computing the average time between requests as an exponential moving averate with alpha=1/N.
Arguments:
Resolution- is the time resolution in 1/Resolution seconds. All storage is done in integers to avoid the need for stack freezing in nb_setarg/3.
See also
- https://en.wikipedia.org/wiki/Moving_average
 1061new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
 1062    current_prolog_flag(max_tagged_integer, MaxI),
 1063    get_time(Base).
 1064
 1065update_rate_mma(State, MMAr) :-
 1066    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
 1067    get_time(Now),
 1068    Stamp is round((Now-Base)*Resolution),
 1069    (   Stamp > MaxI
 1070    ->  nb_setarg(1, State, Now),
 1071        nb_setarg(2, State, 0)
 1072    ;   true
 1073    ),
 1074    Diff is Stamp-Last,
 1075    nb_setarg(2, State, Stamp),
 1076    MMA is round(((N-1)*MMA0+Diff)/N),
 1077    nb_setarg(6, State, MMA),
 1078    MMAr is MMA/float(Resolution).
 1079
 1080
 1081                 /*******************************
 1082                 *            MESSAGES          *
 1083                 *******************************/
 1084
 1085:- multifile
 1086    prolog:message/3. 1087
 1088prolog:message(httpd_started_server(Port, Options)) -->
 1089    [ 'Started server at '-[] ],
 1090    http_root(Port, Options).
 1091prolog:message(httpd_stopped_worker(Self, Status)) -->
 1092    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1093prolog:message(httpd_restarted_worker(Self)) -->
 1094    [ 'Replaced aborted worker ~p'-[Self] ].
 1095prolog:message(httpd(created_pool(Pool))) -->
 1096    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1097      'Create this pool at startup-time or define the hook ', nl,
 1098      'http:create_pool/1 to avoid this message and create a ', nl,
 1099      'pool that fits the usage-profile.'
 1100    ].
 1101
 1102http_root(Address, Options) -->
 1103    { landing_page(Address, URI, Options) },
 1104    [ '~w'-[URI] ].
 1105
 1106landing_page(Host:Port, URI, Options) :-
 1107    !,
 1108    must_be(atom, Host),
 1109    must_be(integer, Port),
 1110    http_server_property(Port, scheme(Scheme)),
 1111    (   default_port(Scheme, Port)
 1112    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1113    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1114    ),
 1115    entry_page(Base, URI, Options).
 1116landing_page(unix_socket(Path), URI, _Options) :-
 1117    !,
 1118    format(string(URI), 'Unix domain socket "~w"', [Path]).
 1119landing_page(Port, URI, Options) :-
 1120    landing_page(localhost:Port, URI, Options).
 1121
 1122default_port(http, 80).
 1123default_port(https, 443).
 1124
 1125entry_page(Base, URI, Options) :-
 1126    option(entry_page(Entry), Options),
 1127    !,
 1128    uri_resolve(Entry, Base, URI).
 1129entry_page(Base, URI, _) :-
 1130    http_absolute_location(root(.), Entry, []),
 1131    uri_resolve(Entry, Base, URI)