View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2022, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread_httpd,
   38          [ http_current_server/2,      % ?:Goal, ?Port
   39            http_server_property/2,     % ?Port, ?Property
   40            http_server/2,              % :Goal, +Options
   41            http_workers/2,             % +Port, ?WorkerCount
   42            http_add_worker/2,          % +Port, +Options
   43            http_current_worker/2,      % ?Port, ?ThreadID
   44            http_stop_server/2,         % +Port, +Options
   45            http_spawn/2,               % :Goal, +Options
   46
   47            http_requeue/1,             % +Request
   48            http_close_connection/1,    % +Request
   49            http_enough_workers/3       % +Queue, +Why, +Peer
   50          ]).   51:- use_module(library(debug)).   52:- use_module(library(error)).   53:- use_module(library(option)).   54:- use_module(library(socket)).   55:- use_module(library(thread_pool)).   56:- use_module(library(gensym)).   57:- use_module(http_wrapper).   58:- use_module(http_path).   59
   60:- autoload(library(uri), [uri_resolve/3]).   61
   62:- predicate_options(http_server/2, 2,
   63                     [ port(any),
   64                       unix_socket(atom),
   65                       entry_page(atom),
   66                       tcp_socket(any),
   67                       workers(positive_integer),
   68                       timeout(number),
   69                       keep_alive_timeout(number),
   70                       silent(boolean),
   71                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   72                       pass_to(system:thread_create/3, 3)
   73                     ]).   74:- predicate_options(http_spawn/2, 2,
   75                     [ pool(atom),
   76                       pass_to(system:thread_create/3, 3),
   77                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   78                     ]).   79:- predicate_options(http_add_worker/2, 2,
   80                     [ timeout(number),
   81                       keep_alive_timeout(number),
   82                       max_idle_time(number),
   83                       pass_to(system:thread_create/3, 3)
   84                     ]).   85
   86/** <module> Threaded HTTP server
   87
   88Most   code   doesn't   need  to   use  this   directly;  instead   use
   89library(http/http_server),  which  combines   this  library  with   the
   90typical HTTP libraries that most servers need.
   91
   92This library defines the HTTP server  frontend of choice for SWI-Prolog.
   93It is based on the multi-threading   capabilities of SWI-Prolog and thus
   94exploits multiple cores  to  serve   requests  concurrently.  The server
   95scales well and can cooperate with   library(thread_pool) to control the
   96number of concurrent requests of a given   type.  For example, it can be
   97configured to handle 200 file download requests concurrently, 2 requests
   98that potentially uses a lot of memory and   8 requests that use a lot of
   99CPU resources.
  100
  101On   Unix   systems,    this    library     can    be    combined   with
  102library(http/http_unix_daemon) to realise a proper  Unix service process
  103that creates a web server at  port   80,  runs under a specific account,
  104optionally detaches from the controlling terminal, etc.
  105
  106Combined with library(http/http_ssl_plugin) from the   SSL package, this
  107library   can   be   used   to    create     an    HTTPS   server.   See
  108<plbase>/doc/packages/examples/ssl/https for an example   server using a
  109self-signed SSL certificate.
  110*/
  111
  112:- meta_predicate
  113    http_server(1, :),
  114    http_current_server(1, ?),
  115    http_spawn(0, +).  116
  117:- dynamic
  118    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  119    queue_worker/2,         % Queue, ThreadID
  120    queue_options/2.        % Queue, Options
  121
  122:- multifile
  123    make_socket_hook/3,
  124    accept_hook/2,
  125    close_hook/1,
  126    open_client_hook/6,
  127    discard_client_hook/1,
  128    http:create_pool/1,
  129    http:schedule_workers/1.  130
  131:- meta_predicate
  132    thread_repeat_wait(0).  133
  134%!  http_server(:Goal, :Options) is det.
  135%
  136%   Create a server at Port that calls Goal for each parsed request.
  137%   Options provide a list of options. Defined options are
  138%
  139%     * port(?Address)
  140%     Port to bind to.  Address is either a port or a term
  141%     Host:Port. The port may be a variable, causing the system
  142%     to select a free port.  See tcp_bind/2.
  143%
  144%     * unix_socket(+Path)
  145%     Instead of binding to a TCP port, bind to a _Unix Domain
  146%     Socket_ at Path.
  147%
  148%     * entry_page(+URI)
  149%     Affects the message printed while the server is started.
  150%     Interpreted as a URI relative to the server root.
  151%
  152%     * tcp_socket(+Socket)
  153%     If provided, use this socket instead of the creating one and
  154%     binding it to an address.  The socket must be bound to an
  155%     address.
  156%
  157%     * workers(+Count)
  158%     Determine the number of worker threads.  Default is 5.  This
  159%     is fine for small scale usage.  Public servers typically need
  160%     a higher number.
  161%
  162%     * timeout(+Seconds)
  163%     Maximum time of inactivity trying to read the request after a
  164%     connection has been opened.  Default is 60 seconds.  See
  165%     set_stream/1 using the _timeout_ option.
  166%
  167%     * keep_alive_timeout(+Seconds)
  168%     Time to keep `Keep alive' connections alive.  Default is
  169%     2 seconds.
  170%
  171%     * stack_limit(+Bytes)
  172%     Stack limit to use for the workers.  The default is inherited
  173%     from the `main` thread.
  174%     If you need to control resource usage you may consider the
  175%     `spawn` option of http_handler/3 and library(thread_pool).
  176%
  177%     * silent(Bool)
  178%     If `true` (default `false`), do not print an informational
  179%     message that the server was started.
  180%
  181%   A  typical  initialization  for  an    HTTP   server  that  uses
  182%   http_dispatch/1 to relay requests to predicates is:
  183%
  184%     ==
  185%     :- use_module(library(http/thread_httpd)).
  186%     :- use_module(library(http/http_dispatch)).
  187%
  188%     start_server(Port) :-
  189%         http_server(http_dispatch, [port(Port)]).
  190%     ==
  191%
  192%   Note that multiple servers  can  coexist   in  the  same  Prolog
  193%   process. A notable application of this is   to have both an HTTP
  194%   and HTTPS server, where the HTTP   server redirects to the HTTPS
  195%   server for handling sensitive requests.
  196
  197http_server(Goal, M:Options0) :-
  198    server_address(Address, Options0),
  199    !,
  200    make_socket(Address, M:Options0, Options),
  201    create_workers(Options),
  202    create_server(Goal, Address, Options),
  203    (   option(silent(true), Options0)
  204    ->  true
  205    ;   print_message(informational,
  206                      httpd_started_server(Address, Options0))
  207    ).
  208http_server(_Goal, _:Options0) :-
  209    existence_error(server_address, Options0).
  210
  211server_address(Address, Options) :-
  212    (   option(port(Port), Options)
  213    ->  Address = Port
  214    ;   option(unix_socket(Path), Options)
  215    ->  Address = unix_socket(Path)
  216    ).
  217
  218address_port(_IFace:Port, Port) :- !.
  219address_port(unix_socket(Path), Path) :- !.
  220address_port(Address, Address) :- !.
  221
  222tcp_address(Port) :-
  223    var(Port),
  224    !.
  225tcp_address(Port) :-
  226    integer(Port),
  227    !.
  228tcp_address(_Iface:_Port).
  229
  230address_domain(localhost:_Port, Domain) =>
  231    Domain = inet.
  232address_domain(Iface:_Port, Domain) =>
  233    (   catch(ip_name(IP, Iface), error(_,_), fail),
  234        functor(IP, ip, 8)
  235    ->  Domain = inet6
  236    ;   Domain = inet
  237    ).
  238address_domain(_, Domain) =>
  239    Domain = inet.
  240
  241
  242%!  make_socket(+Address, :OptionsIn, -OptionsOut) is det.
  243%
  244%   Create the HTTP server socket and  worker pool queue. OptionsOut
  245%   is quaranteed to hold the option queue(QueueId).
  246%
  247%   @arg   OptionsIn   is   qualified   to     allow   passing   the
  248%   module-sensitive ssl option argument.
  249
  250make_socket(Address, M:Options0, Options) :-
  251    tcp_address(Address),
  252    make_socket_hook(Address, M:Options0, Options),
  253    !.
  254make_socket(Address, _:Options0, Options) :-
  255    option(tcp_socket(_), Options0),
  256    !,
  257    make_addr_atom('httpd', Address, Queue),
  258    Options = [ queue(Queue)
  259              | Options0
  260              ].
  261make_socket(Address, _:Options0, Options) :-
  262    tcp_address(Address),
  263    !,
  264    address_domain(Address, Domain),
  265    socket_create(Socket, [domain(Domain)]),
  266    tcp_setopt(Socket, reuseaddr),
  267    tcp_bind(Socket, Address),
  268    tcp_listen(Socket, 64),
  269    make_addr_atom('httpd', Address, Queue),
  270    Options = [ queue(Queue),
  271                tcp_socket(Socket)
  272              | Options0
  273              ].
  274:- if(current_predicate(unix_domain_socket/1)).  275make_socket(Address, _:Options0, Options) :-
  276    Address = unix_socket(Path),
  277    !,
  278    unix_domain_socket(Socket),
  279    tcp_bind(Socket, Path),
  280    tcp_listen(Socket, 64),
  281    make_addr_atom('httpd', Address, Queue),
  282    Options = [ queue(Queue),
  283                tcp_socket(Socket)
  284              | Options0
  285              ].
  286:- endif.  287
  288%!  make_addr_atom(+Scheme, +Address, -Atom) is det.
  289%
  290%   Create an atom that identifies  the   server's  queue and thread
  291%   resources.
  292
  293make_addr_atom(Scheme, Address, Atom) :-
  294    phrase(address_parts(Address), Parts),
  295    atomic_list_concat([Scheme,@|Parts], Atom).
  296
  297address_parts(Var) -->
  298    { var(Var),
  299      !,
  300      instantiation_error(Var)
  301    }.
  302address_parts(Atomic) -->
  303    { atomic(Atomic) },
  304    !,
  305    [Atomic].
  306address_parts(Host:Port) -->
  307    !,
  308    address_parts(Host), [:], address_parts(Port).
  309address_parts(ip(A,B,C,D)) -->
  310    !,
  311    [ A, '.', B, '.', C, '.', D ].
  312address_parts(unix_socket(Path)) -->
  313    [Path].
  314address_parts(Address) -->
  315    { domain_error(http_server_address, Address) }.
  316
  317
  318%!  create_server(:Goal, +Address, +Options) is det.
  319%
  320%   Create the main server thread that runs accept_server/2 to
  321%   listen to new requests.
  322
  323create_server(Goal, Address, Options) :-
  324    get_time(StartTime),
  325    memberchk(queue(Queue), Options),
  326    scheme(Scheme, Options),
  327    autoload_https(Scheme),
  328    address_port(Address, Port),
  329    make_addr_atom(Scheme, Port, Alias),
  330    thread_self(Initiator),
  331    thread_create(accept_server(Goal, Initiator, Options), _,
  332                  [ alias(Alias)
  333                  ]),
  334    thread_get_message(server_started),
  335    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  336
  337scheme(Scheme, Options) :-
  338    option(scheme(Scheme), Options),
  339    !.
  340scheme(Scheme, Options) :-
  341    (   option(ssl(_), Options)
  342    ;   option(ssl_instance(_), Options)
  343    ),
  344    !,
  345    Scheme = https.
  346scheme(http, _).
  347
  348autoload_https(https) :-
  349    \+ clause(accept_hook(_Goal, _Options), _),
  350    exists_source(library(http/http_ssl_plugin)),
  351    !,
  352    use_module(library(http/http_ssl_plugin)).
  353autoload_https(_).
  354
  355%!  http_current_server(:Goal, ?Port) is nondet.
  356%
  357%   True if Goal is the goal of a server at Port.
  358%
  359%   @deprecated Use http_server_property(Port, goal(Goal))
  360
  361http_current_server(Goal, Port) :-
  362    current_server(Port, Goal, _, _, _, _).
  363
  364
  365%!  http_server_property(?Port, ?Property) is nondet.
  366%
  367%   True if Property is a property of the HTTP server running at
  368%   Port.  Defined properties are:
  369%
  370%       * goal(:Goal)
  371%       Goal used to start the server. This is often
  372%       http_dispatch/1.
  373%       * scheme(-Scheme)
  374%       Scheme is one of `http` or `https`.
  375%       * start_time(?Time)
  376%       Time-stamp when the server was created.
  377
  378http_server_property(_:Port, Property) :-
  379    integer(Port),
  380    !,
  381    server_property(Property, Port).
  382http_server_property(Port, Property) :-
  383    server_property(Property, Port).
  384
  385server_property(goal(Goal), Port) :-
  386    current_server(Port, Goal, _, _, _, _).
  387server_property(scheme(Scheme), Port) :-
  388    current_server(Port, _, _, _, Scheme, _).
  389server_property(start_time(Time), Port) :-
  390    current_server(Port, _, _, _, _, Time).
  391
  392
  393%!  http_workers(+Port, -Workers) is det.
  394%!  http_workers(+Port, +Workers:int) is det.
  395%
  396%   Query or set the number of workers  for the server at this port.
  397%   The number of workers is dynamically   modified. Setting it to 1
  398%   (one) can be used to profile the worker using tprofile/1.
  399
  400http_workers(Port, Workers) :-
  401    must_be(ground, Port),
  402    current_server(Port, _, _, Queue, _, _),
  403    !,
  404    (   integer(Workers)
  405    ->  resize_pool(Queue, Workers)
  406    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  407        length(WorkerIDs, Workers)
  408    ).
  409http_workers(Port, _) :-
  410    existence_error(http_server, Port).
  411
  412
  413%!  http_add_worker(+Port, +Options) is det.
  414%
  415%   Add a new worker to  the  HTTP   server  for  port Port. Options
  416%   overrule the default queue  options.   The  following additional
  417%   options are processed:
  418%
  419%     - max_idle_time(+Seconds)
  420%     The created worker will automatically terminate if there is
  421%     no new work within Seconds.
  422
  423http_add_worker(Port, Options) :-
  424    must_be(ground, Port),
  425    current_server(Port, _, _, Queue, _, _),
  426    !,
  427    queue_options(Queue, QueueOptions),
  428    merge_options(Options, QueueOptions, WorkerOptions),
  429    atom_concat(Queue, '_', AliasBase),
  430    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  431http_add_worker(Port, _) :-
  432    existence_error(http_server, Port).
  433
  434
  435%!  http_current_worker(?Port, ?ThreadID) is nondet.
  436%
  437%   True if ThreadID is the identifier   of  a Prolog thread serving
  438%   Port. This predicate is  motivated  to   allow  for  the  use of
  439%   arbitrary interaction with the worker thread for development and
  440%   statistics.
  441
  442http_current_worker(Port, ThreadID) :-
  443    current_server(Port, _, _, Queue, _, _),
  444    queue_worker(Queue, ThreadID).
  445
  446
  447%!  accept_server(:Goal, +Initiator, +Options)
  448%
  449%   The goal of a small server-thread accepting new requests and
  450%   posting them to the queue of workers.
  451
  452accept_server(Goal, Initiator, Options) :-
  453    Ex = http_stop(Stopper),
  454    catch(accept_server2(Goal, Initiator, Options), Ex, true),
  455    thread_self(Thread),
  456    debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]),
  457    retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)),
  458    close_pending_accepts(Queue),
  459    close_server_socket(Options),
  460    thread_send_message(Stopper, http_stopped).
  461
  462accept_server2(Goal, Initiator, Options) :-
  463    thread_send_message(Initiator, server_started),
  464    repeat,
  465      (   catch(accept_server3(Goal, Options), E, true)
  466      ->  (   var(E)
  467          ->  fail
  468          ;   accept_rethrow_error(E)
  469          ->  throw(E)
  470          ;   print_message(error, E),
  471              fail
  472          )
  473      ;   print_message(error,      % internal error
  474                        goal_failed(accept_server3(Goal, Options))),
  475          fail
  476      ).
  477
  478accept_server3(Goal, Options) :-
  479    accept_hook(Goal, Options),
  480    !.
  481accept_server3(Goal, Options) :-
  482    memberchk(tcp_socket(Socket), Options),
  483    memberchk(queue(Queue), Options),
  484    debug(http(connection), 'Waiting for connection', []),
  485    tcp_accept(Socket, Client, Peer),
  486    sig_atomic(send_to_worker(Queue, Client, Goal, Peer)),
  487    http_enough_workers(Queue, accept, Peer).
  488
  489send_to_worker(Queue, Client, Goal, Peer) :-
  490    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  491    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  492
  493accept_rethrow_error(http_stop(_)).
  494accept_rethrow_error('$aborted').
  495
  496
  497%!  close_server_socket(+Options)
  498%
  499%   Close the server socket.
  500
  501close_server_socket(Options) :-
  502    close_hook(Options),
  503    !.
  504close_server_socket(Options) :-
  505    memberchk(tcp_socket(Socket), Options),
  506    !,
  507    tcp_close_socket(Socket).
  508
  509%!  close_pending_accepts(+Queue)
  510
  511close_pending_accepts(Queue) :-
  512    (   thread_get_message(Queue, Msg, [timeout(0)])
  513    ->  close_client(Msg),
  514        close_pending_accepts(Queue)
  515    ;   true
  516    ).
  517
  518close_client(tcp_client(Client, _Goal, _0Peer)) =>
  519    debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]),
  520    tcp_close_socket(Client).
  521close_client(Msg) =>
  522    (   discard_client_hook(Msg)
  523    ->  true
  524    ;   print_message(warning, http_close_client(Msg))
  525    ).
  526
  527
  528%!  http_stop_server(+Port, +Options)
  529%
  530%   Stop the indicated  HTTP  server   gracefully.  First  stops all
  531%   workers, then stops the server.
  532%
  533%   @tbd    Realise non-graceful stop
  534
  535http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  536    ground(Host),
  537    !,
  538    http_stop_server(Port, Options).
  539http_stop_server(Port, _Options) :-
  540    http_workers(Port, 0),                  % checks Port is ground
  541    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  542    retractall(queue_options(Queue, _)),
  543    debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]),
  544    thread_self(Stopper),
  545    thread_signal(Thread, throw(http_stop(Stopper))),
  546    (   thread_get_message(Stopper, http_stopped, [timeout(0.1)])
  547    ->  true
  548    ;   catch(connect(localhost:Port), _, true)
  549    ),
  550    thread_join(Thread, _0Status),
  551    debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]),
  552    message_queue_destroy(Queue).
  553
  554connect(Address) :-
  555    setup_call_cleanup(
  556        tcp_socket(Socket),
  557        tcp_connect(Socket, Address),
  558        tcp_close_socket(Socket)).
  559
  560%!  http_enough_workers(+Queue, +Why, +Peer) is det.
  561%
  562%   Check that we have enough workers in our queue. If not, call the
  563%   hook http:schedule_workers/1 to extend  the   worker  pool. This
  564%   predicate can be used by accept_hook/2.
  565
  566http_enough_workers(Queue, _Why, _Peer) :-
  567    message_queue_property(Queue, waiting(_0)),
  568    !,
  569    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  570http_enough_workers(Queue, Why, Peer) :-
  571    message_queue_property(Queue, size(Size)),
  572    (   enough(Size, Why)
  573    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  574    ;   current_server(Port, _, _, Queue, _, _),
  575        Data = _{ port:Port,
  576                  reason:Why,
  577                  peer:Peer,
  578                  waiting:Size,
  579                  queue:Queue
  580                },
  581        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  582        catch(http:schedule_workers(Data),
  583              Error,
  584              print_message(error, Error))
  585    ->  true
  586    ;   true
  587    ).
  588
  589enough(0, _).
  590enough(1, keep_alive).                  % I will be ready myself
  591
  592
  593%!  http:schedule_workers(+Data:dict) is semidet.
  594%
  595%   Hook called if a  new  connection   or  a  keep-alive connection
  596%   cannot be scheduled _immediately_ to a worker. Dict contains the
  597%   following keys:
  598%
  599%     - port:Port
  600%     Port number that identifies the server.
  601%     - reason:Reason
  602%     One of =accept= for a new connection or =keep_alive= if a
  603%     worker tries to reschedule itself.
  604%     - peer:Peer
  605%     Identify the other end of the connection
  606%     - waiting:Size
  607%     Number of messages waiting in the queue.
  608%     - queue:Queue
  609%     Message queue used to dispatch accepted messages.
  610%
  611%   Note that, when called with `reason:accept`,   we  are called in
  612%   the time critical main accept loop.   An  implementation of this
  613%   hook shall typically send  the  event   to  thread  dedicated to
  614%   dynamic worker-pool management.
  615%
  616%   @see    http_add_worker/2 may be used to create (temporary) extra
  617%           workers.
  618
  619
  620                 /*******************************
  621                 *    WORKER QUEUE OPERATIONS   *
  622                 *******************************/
  623
  624%!  create_workers(+Options)
  625%
  626%   Create the pool of HTTP worker-threads. Each worker has the
  627%   alias http_worker_N.
  628
  629create_workers(Options) :-
  630    option(workers(N), Options, 5),
  631    option(queue(Queue), Options),
  632    catch(message_queue_create(Queue), _, true),
  633    atom_concat(Queue, '_', AliasBase),
  634    create_workers(1, N, Queue, AliasBase, Options),
  635    assert(queue_options(Queue, Options)).
  636
  637create_workers(I, N, _, _, _) :-
  638    I > N,
  639    !.
  640create_workers(I, N, Queue, AliasBase, Options) :-
  641    gensym(AliasBase, Alias),
  642    thread_create(http_worker(Options), Id,
  643                  [ alias(Alias)
  644                  | Options
  645                  ]),
  646    assertz(queue_worker(Queue, Id)),
  647    I2 is I + 1,
  648    create_workers(I2, N, Queue, AliasBase, Options).
  649
  650
  651%!  resize_pool(+Queue, +Workers) is det.
  652%
  653%   Create or destroy workers. If workers   are  destroyed, the call
  654%   waits until the desired number of waiters is reached.
  655
  656resize_pool(Queue, Size) :-
  657    findall(W, queue_worker(Queue, W), Workers),
  658    length(Workers, Now),
  659    (   Now < Size
  660    ->  queue_options(Queue, Options),
  661        atom_concat(Queue, '_', AliasBase),
  662        I0 is Now+1,
  663        create_workers(I0, Size, Queue, AliasBase, Options)
  664    ;   Now == Size
  665    ->  true
  666    ;   Now > Size
  667    ->  Excess is Now - Size,
  668        thread_self(Me),
  669        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  670        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  671    ).
  672
  673
  674%!  http_worker(+Options)
  675%
  676%   Run HTTP worker main loop. Workers   simply  wait until they are
  677%   passed an accepted socket to process  a client.
  678%
  679%   If the message quit(Sender) is read   from the queue, the worker
  680%   stops.
  681
  682http_worker(Options) :-
  683    debug(http(scheduler), 'New worker', []),
  684    prolog_listen(this_thread_exit, done_worker),
  685    option(queue(Queue), Options),
  686    option(max_idle_time(MaxIdle), Options, infinite),
  687    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  688      debug(http(worker), 'Waiting for a job ...', []),
  689      debug(http(worker), 'Got job ~p', [Message]),
  690      (   Message = quit(Sender)
  691      ->  !,
  692          thread_self(Self),
  693          thread_detach(Self),
  694          (   Sender == idle
  695          ->  true
  696          ;   retract(queue_worker(Queue, Self)),
  697              thread_send_message(Sender, quitted(Self))
  698          )
  699      ;   open_client(Message, Queue, Goal, In, Out,
  700                      Options, ClientOptions),
  701          (   catch(http_process(Goal, In, Out, ClientOptions),
  702                    Error, true)
  703          ->  true
  704          ;   Error = goal_failed(http_process/4)
  705          ),
  706          (   var(Error)
  707          ->  fail
  708          ;   current_message_level(Error, Level),
  709              print_message(Level, Error),
  710              memberchk(peer(Peer), ClientOptions),
  711              close_connection(Peer, In, Out),
  712              fail
  713          )
  714      ).
  715
  716get_work(Queue, Message, infinite) :-
  717    !,
  718    thread_get_message(Queue, Message).
  719get_work(Queue, Message, MaxIdle) :-
  720    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  721    ->  true
  722    ;   Message = quit(idle)
  723    ).
  724
  725
  726%!  open_client(+Message, +Queue, -Goal, -In, -Out,
  727%!              +Options, -ClientOptions) is semidet.
  728%
  729%   Opens the connection to the client in a worker from the message
  730%   sent to the queue by accept_server/2.
  731
  732open_client(requeue(In, Out, Goal, ClOpts),
  733            _, Goal, In, Out, Opts, ClOpts) :-
  734    !,
  735    memberchk(peer(Peer), ClOpts),
  736    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  737    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  738open_client(Message, Queue, Goal, In, Out, Opts,
  739            [ pool(client(Queue, Goal, In, Out)),
  740              timeout(Timeout)
  741            | Options
  742            ]) :-
  743    catch(open_client(Message, Goal, In, Out, Options, Opts),
  744          E, report_error(E)),
  745    option(timeout(Timeout), Opts, 60),
  746    (   debugging(http(connection))
  747    ->  memberchk(peer(Peer), Options),
  748        debug(http(connection), 'Opened connection from ~p', [Peer])
  749    ;   true
  750    ).
  751
  752
  753%!  open_client(+Message, +Goal, -In, -Out,
  754%!              -ClientOptions, +Options) is det.
  755
  756open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  757    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  758    !.
  759open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  760            [ peer(Peer),
  761              protocol(http)
  762            ], _) :-
  763    tcp_open_socket(Socket, In, Out).
  764
  765report_error(E) :-
  766    print_message(error, E),
  767    fail.
  768
  769
  770%!  check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet.
  771%
  772%   Wait for the client for at most  TimeOut seconds. Succeed if the
  773%   client starts a new request within   this  time. Otherwise close
  774%   the connection and fail.
  775
  776check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  777    stream_property(In, timeout(Old)),
  778    set_stream(In, timeout(TMO)),
  779    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  780    catch(peek_code(In, Code), E, true),
  781    (   var(E),                     % no exception
  782        Code \== -1                 % no end-of-file
  783    ->  set_stream(In, timeout(Old)),
  784        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  785    ;   (   Code == -1
  786        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  787        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  788        ),
  789        close_connection(Peer, In, Out),
  790        fail
  791    ).
  792
  793
  794%!  done_worker
  795%
  796%   Called when worker is terminated  due   to  http_workers/2  or a
  797%   (debugging) exception. In  the   latter  case, recreate_worker/2
  798%   creates a new worker.
  799
  800done_worker :-
  801    thread_self(Self),
  802    thread_detach(Self),
  803    retract(queue_worker(Queue, Self)),
  804    thread_property(Self, status(Status)),
  805    !,
  806    (   catch(recreate_worker(Status, Queue), _, fail)
  807    ->  print_message(informational,
  808                      httpd_restarted_worker(Self))
  809    ;   done_status_message_level(Status, Level),
  810        print_message(Level,
  811                      httpd_stopped_worker(Self, Status))
  812    ).
  813done_worker :-                                  % received quit(Sender)
  814    thread_self(Self),
  815    thread_property(Self, status(Status)),
  816    done_status_message_level(Status, Level),
  817    print_message(Level,
  818                  httpd_stopped_worker(Self, Status)).
  819
  820done_status_message_level(true, silent) :- !.
  821done_status_message_level(exception('$aborted'), silent) :- !.
  822done_status_message_level(_, informational).
  823
  824
  825%!  recreate_worker(+Status, +Queue) is semidet.
  826%
  827%   Deal with the possibility  that   threads  are,  during development,
  828%   killed with abort/0. We recreate the worker to avoid that eventually
  829%   we run out of workers. If  we  are   aborted  due  to a halt/0 call,
  830%   thread_create/3 will raise a permission error.
  831%
  832%   The first clause deals with the possibility  that we cannot write to
  833%   `user_error`. This is possible when Prolog   is started as a service
  834%   using some service managers. Would be  nice   if  we  could write an
  835%   error, but where?
  836
  837recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  838    halt(2).
  839recreate_worker(exception(Error), Queue) :-
  840    recreate_on_error(Error),
  841    queue_options(Queue, Options),
  842    atom_concat(Queue, '_', AliasBase),
  843    create_workers(1, 1, Queue, AliasBase, Options).
  844
  845recreate_on_error('$aborted').
  846recreate_on_error(time_limit_exceeded).
  847
  848%!  thread_httpd:message_level(+Exception, -Level)
  849%
  850%   Determine the message stream used  for   exceptions  that  may occur
  851%   during server_loop/5. Being multifile, clauses can   be added by the
  852%   application to refine error handling.   See  also message_hook/3 for
  853%   further programming error handling.
  854
  855:- multifile
  856    message_level/2.  857
  858message_level(error(io_error(read, _), _),               silent).
  859message_level(error(socket_error(epipe,_), _),           silent).
  860message_level(error(http_write_short(_Obj,_Written), _), silent).
  861message_level(error(timeout_error(read, _), _),          informational).
  862message_level(keep_alive_timeout,                        silent).
  863
  864current_message_level(Term, Level) :-
  865    (   message_level(Term, Level)
  866    ->  true
  867    ;   Level = error
  868    ).
  869
  870
  871%!  http_requeue(+Header)
  872%
  873%   Re-queue a connection to  the  worker   pool.  This  deals  with
  874%   processing additional requests on keep-alive connections.
  875
  876http_requeue(Header) :-
  877    requeue_header(Header, ClientOptions),
  878    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  879    memberchk(peer(Peer), ClientOptions),
  880    http_enough_workers(Queue, keep_alive, Peer),
  881    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  882    !.
  883http_requeue(Header) :-
  884    debug(http(error), 'Re-queue failed: ~p', [Header]),
  885    fail.
  886
  887requeue_header([], []).
  888requeue_header([H|T0], [H|T]) :-
  889    requeue_keep(H),
  890    !,
  891    requeue_header(T0, T).
  892requeue_header([_|T0], T) :-
  893    requeue_header(T0, T).
  894
  895requeue_keep(pool(_)).
  896requeue_keep(peer(_)).
  897requeue_keep(protocol(_)).
  898
  899
  900%!  http_process(Message, Queue, +Options)
  901%
  902%   Handle a single client message on the given stream.
  903
  904http_process(Goal, In, Out, Options) :-
  905    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  906          [Goal, In, Out]),
  907    option(timeout(TMO), Options, 60),
  908    set_stream(In, timeout(TMO)),
  909    set_stream(Out, timeout(TMO)),
  910    http_wrapper(Goal, In, Out, Connection,
  911                 [ request(Request)
  912                 | Options
  913                 ]),
  914    next(Connection, Request).
  915
  916next(Connection, Request) :-
  917    next_(Connection, Request), !.
  918next(Connection, Request) :-
  919    print_message(warning, goal_failed(next(Connection,Request))).
  920
  921next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  922    !,
  923    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  924    (   catch(call(SwitchGoal, In, Out), E,
  925              (   print_message(error, E),
  926                  fail))
  927    ->  true
  928    ;   http_close_connection(Request)
  929    ).
  930next_(spawned(ThreadId), _) :-
  931    !,
  932    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  933next_(Connection, Request) :-
  934    downcase_atom(Connection, 'keep-alive'),
  935    http_requeue(Request),
  936    !.
  937next_(_, Request) :-
  938    http_close_connection(Request).
  939
  940
  941%!  http_close_connection(+Request)
  942%
  943%   Close connection associated to Request.  See also http_requeue/1.
  944
  945http_close_connection(Request) :-
  946    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  947    memberchk(peer(Peer), Request),
  948    close_connection(Peer, In, Out).
  949
  950%!  close_connection(+Peer, +In, +Out)
  951%
  952%   Closes the connection from the server to the client.  Errors are
  953%   currently silently ignored.
  954
  955close_connection(Peer, In, Out) :-
  956    debug(http(connection), 'Closing connection from ~p', [Peer]),
  957    catch(close(In, [force(true)]), _, true),
  958    catch(close(Out, [force(true)]), _, true).
  959
  960%!  http_spawn(:Goal, +Options) is det.
  961%
  962%   Continue this connection on a  new   thread.  A handler may call
  963%   http_spawn/2 to start a new thread that continues processing the
  964%   current request using Goal. The original   thread returns to the
  965%   worker pool for processing new requests.   Options are passed to
  966%   thread_create/3, except for:
  967%
  968%       * pool(+Pool)
  969%       Interfaces to library(thread_pool), starting the thread
  970%       on the given pool.
  971%
  972%   If a pool does not exist, this predicate calls the multifile
  973%   hook http:create_pool/1 to create it. If this predicate succeeds
  974%   the operation is retried.
  975
  976http_spawn(Goal, Options) :-
  977    select_option(pool(Pool), Options, ThreadOptions),
  978    !,
  979    current_output(CGI),
  980    catch(thread_create_in_pool(Pool,
  981                                wrap_spawned(CGI, Goal), Id,
  982                                [ detached(true)
  983                                | ThreadOptions
  984                                ]),
  985          Error,
  986          true),
  987    (   var(Error)
  988    ->  http_spawned(Id)
  989    ;   Error = error(resource_error(threads_in_pool(_)), _)
  990    ->  throw(http_reply(busy))
  991    ;   Error = error(existence_error(thread_pool, Pool), _),
  992        create_pool(Pool)
  993    ->  http_spawn(Goal, Options)
  994    ;   throw(Error)
  995    ).
  996http_spawn(Goal, Options) :-
  997    current_output(CGI),
  998    thread_create(wrap_spawned(CGI, Goal), Id,
  999                  [ detached(true)
 1000                  | Options
 1001                  ]),
 1002    http_spawned(Id).
 1003
 1004wrap_spawned(CGI, Goal) :-
 1005    set_output(CGI),
 1006    http_wrap_spawned(Goal, Request, Connection),
 1007    next(Connection, Request).
 1008
 1009%!  create_pool(+Pool)
 1010%
 1011%   Lazy  creation  of  worker-pools  for   the  HTTP  server.  This
 1012%   predicate calls the hook http:create_pool/1.   If the hook fails
 1013%   it creates a default pool of size   10. This should suffice most
 1014%   typical usecases. Note that we  get   a  permission error if the
 1015%   pool is already created.  We can ignore this.
 1016
 1017create_pool(Pool) :-
 1018    E = error(permission_error(create, thread_pool, Pool), _),
 1019    catch(http:create_pool(Pool), E, true).
 1020create_pool(Pool) :-
 1021    print_message(informational, httpd(created_pool(Pool))),
 1022    thread_pool_create(Pool, 10, []).
 1023
 1024
 1025		 /*******************************
 1026		 *         WAIT POLICIES	*
 1027		 *******************************/
 1028
 1029:- meta_predicate
 1030    thread_repeat_wait(0). 1031
 1032%!  thread_repeat_wait(:Goal) is multi.
 1033%
 1034%   Acts as `repeat,  thread_idle(Goal)`,  choosing   whether  to  use a
 1035%   `long` or `short` idle time based on the average firing rate.
 1036
 1037thread_repeat_wait(Goal) :-
 1038    new_rate_mma(5, 1000, State),
 1039    repeat,
 1040      update_rate_mma(State, MMA),
 1041      long(MMA, IsLong),
 1042      (   IsLong == brief
 1043      ->  call(Goal)
 1044      ;   thread_idle(Goal, IsLong)
 1045      ).
 1046
 1047long(MMA, brief) :-
 1048    MMA < 0.05,
 1049    !.
 1050long(MMA, short) :-
 1051    MMA < 1,
 1052    !.
 1053long(_, long).
 1054
 1055%!  new_rate_mma(+N, +Resolution, -State) is det.
 1056%!  update_rate_mma(!State, -MMA) is det.
 1057%
 1058%   Implement _Modified Moving  Average_  computing   the  average  time
 1059%   between requests as an exponential moving averate with alpha=1/N.
 1060%
 1061%   @arg Resolution is the time resolution  in 1/Resolution seconds. All
 1062%   storage is done in integers to avoid  the need for stack freezing in
 1063%   nb_setarg/3.
 1064%
 1065%   @see https://en.wikipedia.org/wiki/Moving_average
 1066
 1067new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
 1068    current_prolog_flag(max_tagged_integer, MaxI),
 1069    get_time(Base).
 1070
 1071update_rate_mma(State, MMAr) :-
 1072    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
 1073    get_time(Now),
 1074    Stamp is round((Now-Base)*Resolution),
 1075    (   Stamp > MaxI
 1076    ->  nb_setarg(1, State, Now),
 1077        nb_setarg(2, State, 0)
 1078    ;   true
 1079    ),
 1080    Diff is Stamp-Last,
 1081    nb_setarg(2, State, Stamp),
 1082    MMA is round(((N-1)*MMA0+Diff)/N),
 1083    nb_setarg(6, State, MMA),
 1084    MMAr is MMA/float(Resolution).
 1085
 1086
 1087                 /*******************************
 1088                 *            MESSAGES          *
 1089                 *******************************/
 1090
 1091:- multifile
 1092    prolog:message/3. 1093
 1094prolog:message(httpd_started_server(Port, Options)) -->
 1095    [ 'Started server at '-[] ],
 1096    http_root(Port, Options).
 1097prolog:message(httpd_stopped_worker(Self, Status)) -->
 1098    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1099prolog:message(httpd_restarted_worker(Self)) -->
 1100    [ 'Replaced aborted worker ~p'-[Self] ].
 1101prolog:message(httpd(created_pool(Pool))) -->
 1102    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1103      'Create this pool at startup-time or define the hook ', nl,
 1104      'http:create_pool/1 to avoid this message and create a ', nl,
 1105      'pool that fits the usage-profile.'
 1106    ].
 1107
 1108http_root(Address, Options) -->
 1109    { landing_page(Address, URI, Options) },
 1110    [ '~w'-[URI] ].
 1111
 1112landing_page(Host:Port, URI, Options) :-
 1113    !,
 1114    must_be(atom, Host),
 1115    must_be(integer, Port),
 1116    http_server_property(Port, scheme(Scheme)),
 1117    (   default_port(Scheme, Port)
 1118    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1119    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1120    ),
 1121    entry_page(Base, URI, Options).
 1122landing_page(unix_socket(Path), URI, _Options) :-
 1123    !,
 1124    format(string(URI), 'Unix domain socket "~w"', [Path]).
 1125landing_page(Port, URI, Options) :-
 1126    landing_page(localhost:Port, URI, Options).
 1127
 1128default_port(http, 80).
 1129default_port(https, 443).
 1130
 1131entry_page(Base, URI, Options) :-
 1132    option(entry_page(Entry), Options),
 1133    !,
 1134    uri_resolve(Entry, Base, URI).
 1135entry_page(Base, URI, _) :-
 1136    http_absolute_location(root(.), Entry, []),
 1137    uri_resolve(Entry, Base, URI)