View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2007-2025, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(thread,
   39          [ concurrent/3,               % +Threads, :Goals, +Options
   40            concurrent_maplist/2,       % :Goal, +List
   41            concurrent_maplist/3,       % :Goal, ?List1, ?List2
   42            concurrent_maplist/4,       % :Goal, ?List1, ?List2, ?List3
   43            concurrent_forall/2,        % :Generate, :Test
   44            concurrent_forall/3,        % :Generate, :Test, +Options
   45            concurrent_and/2,           % :Generator,:Test
   46            concurrent_and/3,           % :Generator,:Test,+Options
   47            first_solution/3,           % -Var, :Goals, +Options
   48
   49            call_in_thread/2,           % +Thread, :Goal
   50            call_in_thread/3            % +Thread, :Goal, +Options
   51          ]).   52:- autoload(library(apply), [maplist/2, maplist/3, maplist/4, maplist/5]).   53:- autoload(library(error), [must_be/2, instantiation_error/1]).   54:- autoload(library(lists), [subtract/3, same_length/2, nth0/3]).   55:- autoload(library(option), [option/2, option/3, meta_options/3]).   56:- autoload(library(ordsets), [ord_intersection/3, ord_union/3]).   57:- use_module(library(debug), [debug/3, assertion/1]).   58
   59%:- debug(concurrent).
   60
   61:- meta_predicate
   62    concurrent(+, :, +),
   63    concurrent_maplist(1, +),
   64    concurrent_maplist(2, ?, ?),
   65    concurrent_maplist(3, ?, ?, ?),
   66    concurrent_forall(0, 0),
   67    concurrent_forall(0, 0, +),
   68    concurrent_and(0, 0),
   69    concurrent_and(0, 0, +),
   70    first_solution(-, :, +),
   71    call_in_thread(+, 0),
   72    call_in_thread(+, 0, :).   73
   74
   75:- predicate_options(concurrent/3, 3,
   76                     [ pass_to(system:thread_create/3, 3)
   77                     ]).   78:- predicate_options(concurrent_forall/3, 3,
   79                     [ threads(nonneg)
   80                     ]).   81:- predicate_options(concurrent_and/3, 3,
   82                     [ threads(nonneg)
   83                     ]).   84:- predicate_options(first_solution/3, 3,
   85                     [ on_fail(oneof([stop,continue])),
   86                       on_error(oneof([stop,continue])),
   87                       pass_to(system:thread_create/3, 3)
   88                     ]).   89
   90/** <module> High level thread primitives
   91
   92This  module  defines  simple  to  use   predicates  for  running  goals
   93concurrently.  Where  the  core  multi-threaded    API  is  targeted  at
   94communicating long-living threads, the predicates   here  are defined to
   95run goals concurrently without having to   deal with thread creation and
   96maintenance explicitely.
   97
   98Note that these predicates run goals   concurrently  and therefore these
   99goals need to be thread-safe. As  the   predicates  in  this module also
  100abort branches of the computation that  are no longer needed, predicates
  101that have side-effect must act properly.  In   a  nutshell, this has the
  102following consequences:
  103
  104  * Nice clean Prolog code without side-effects (but with cut) works
  105    fine.
  106  * Side-effects are bad news.  If you really need assert to store
  107    intermediate results, use the thread_local/1 declaration.  This
  108    also guarantees cleanup of left-over clauses if the thread is
  109    cancelled.  For other side-effects, make sure to use call_cleanup/2
  110    to undo them should the thread be cancelled.
  111  * Global variables are ok as they are thread-local and destroyed
  112    on thread cancellation.  Note however that global variables in
  113    the calling thread are *not* available in the threads that are
  114    created.  You have to pass the value as an argument and initialise
  115    the variable in the new thread.
  116  * Thread-cancellation uses thread_signal/2.  Using this code
  117    with long-blocking foreign predicates may result in long delays,
  118    even if another thread asks for cancellation.
  119
  120@author Jan Wielemaker
  121*/
  122
  123%!  concurrent(+N, :Goals, +Options) is semidet.
  124%
  125%   Run Goals in parallel using N   threads.  This call blocks until
  126%   all work has been done.  The   Goals  must  be independent. They
  127%   should not communicate using shared  variables   or  any form of
  128%   global data. All Goals must be thread-safe.
  129%
  130%   Execution succeeds if all goals  have   succeeded.  If  one goal
  131%   fails or throws an exception,  other   workers  are abandoned as
  132%   soon as possible and the entire   computation fails or re-throws
  133%   the exception. Note that if  multiple   goals  fail  or raise an
  134%   error it is not defined which error or failure is reported.
  135%
  136%   On successful completion, variable bindings   are returned. Note
  137%   however that threads have independent   stacks and therefore the
  138%   goal is copied to the worker  thread   and  the result is copied
  139%   back to the caller of concurrent/3.
  140%
  141%   Choosing the right number of threads is not always obvious. Here
  142%   are some scenarios:
  143%
  144%     * If the goals are CPU intensive and normally all succeeding,
  145%     typically the number of CPUs is the optimal number of
  146%     threads.  Less does not use all CPUs, more wastes time in
  147%     context switches and also uses more memory.
  148%
  149%     * If the tasks are I/O bound the number of threads is
  150%     typically higher than the number of CPUs.
  151%
  152%     * If one or more of the goals may fail or produce an error,
  153%     using a higher number of threads may find this earlier.
  154%
  155%   @arg N Number of worker-threads to create. Using 1, no threads
  156%        are created.  If N is larger than the number of Goals we
  157%        create exactly as many threads as there are Goals.
  158%   @arg Goals List of callable terms.
  159%   @arg Options Passed to thread_create/3 for creating the
  160%        workers.  Only options changing the stack-sizes can
  161%        be used. In particular, do not pass the detached or alias
  162%        options.
  163%   @see In many cases, concurrent_maplist/2 and friends
  164%        is easier to program and is tractable to program
  165%        analysis.
  166
  167concurrent(1, M:List, _) :-
  168    !,
  169    maplist(once_in_module(M), List).
  170concurrent(N, M:List, Options) :-
  171    must_be(positive_integer, N),
  172    must_be(list(callable), List),
  173    length(List, JobCount),
  174    message_queue_create(Done),
  175    message_queue_create(Queue),
  176    WorkerCount is min(N, JobCount),
  177    create_workers(WorkerCount, Queue, Done, Workers, Options),
  178    submit_goals(List, 1, M, Queue, VarList),
  179    forall(between(1, WorkerCount, _),
  180           thread_send_message(Queue, done)),
  181    VT =.. [vars|VarList],
  182    concur_wait(JobCount, Done, VT, cleanup(Workers, Queue),
  183                Result, [], Exitted),
  184    subtract(Workers, Exitted, RemainingWorkers),
  185    concur_cleanup(Result, RemainingWorkers, [Queue, Done]),
  186    (   Result == true
  187    ->  true
  188    ;   Result = false
  189    ->  fail
  190    ;   Result = exception(Error)
  191    ->  throw(Error)
  192    ).
  193
  194once_in_module(M, Goal) :-
  195    call(M:Goal), !.
  196
  197%!  submit_goals(+List, +Id0, +Module, +Queue, -Vars) is det.
  198%
  199%   Send all jobs from List to Queue. Each goal is added to Queue as
  200%   a term goal(Id, Goal, Vars). Vars  is   unified  with  a list of
  201%   lists of free variables appearing in each goal.
  202
  203submit_goals([], _, _, _, []).
  204submit_goals([H|T], I, M, Queue, [Vars|VT]) :-
  205    term_variables(H, Vars),
  206    thread_send_message(Queue, goal(I, M:H, Vars)),
  207    I2 is I + 1,
  208    submit_goals(T, I2, M, Queue, VT).
  209
  210
  211%!  concur_wait(+N, +Done:queue, +VT:compound, +Cleanup,
  212%!              -Result, +Exitted0, -Exitted) is semidet.
  213%
  214%   Wait for completion, failure or error.
  215%
  216%   @arg Exited List of thread-ids with threads that completed
  217%   before all work was done.
  218
  219concur_wait(0, _, _, _, true, Exited, Exited) :- !.
  220concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :-
  221    debug(concurrent, 'Concurrent: waiting for workers ...', []),
  222    catch(thread_get_message(Done, Exit), Error,
  223          concur_abort(Error, Cleanup, Done, Exitted0)),
  224    debug(concurrent, 'Waiting: received ~p', [Exit]),
  225    (   Exit = done(Id, Vars)
  226    ->  debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]),
  227        arg(Id, VT, Vars),
  228        N2 is N - 1,
  229        concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted)
  230    ;   Exit = finished(Thread)
  231    ->  thread_join(Thread, JoinStatus),
  232        debug(concurrent, 'Concurrent: waiter ~p joined: ~p',
  233              [Thread, JoinStatus]),
  234        (   JoinStatus == true
  235        ->  concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted)
  236        ;   Status = JoinStatus,
  237            Exitted = [Thread|Exitted0]
  238        )
  239    ).
  240
  241concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :-
  242    debug(concurrent, 'Concurrent: got ~p', [Error]),
  243    subtract(Workers, Exitted, RemainingWorkers),
  244    concur_cleanup(Error, RemainingWorkers, [Queue, Done]),
  245    throw(Error).
  246
  247create_workers(N, Queue, Done, [Id|Ids], Options) :-
  248    N > 0,
  249    !,
  250    thread_create(worker(Queue, Done), Id,
  251                  [ at_exit(thread_send_message(Done, finished(Id)))
  252                  | Options
  253                  ]),
  254    N2 is N - 1,
  255    create_workers(N2, Queue, Done, Ids, Options).
  256create_workers(_, _, _, [], _).
  257
  258
  259%!  worker(+WorkQueue, +DoneQueue) is det.
  260%
  261%   Process jobs from WorkQueue and send the results to DoneQueue.
  262
  263worker(Queue, Done) :-
  264    thread_get_message(Queue, Message),
  265    debug(concurrent, 'Worker: received ~p', [Message]),
  266    (   Message = goal(Id, Goal, Vars)
  267    ->  (   Goal
  268        ->  thread_send_message(Done, done(Id, Vars)),
  269            worker(Queue, Done)
  270        )
  271    ;   true
  272    ).
  273
  274
  275%!  concur_cleanup(+Result, +Workers:list, +Queues:list) is det.
  276%
  277%   Cleanup the concurrent workers and message  queues. If Result is
  278%   not =true=, signal all workers to make them stop prematurely. If
  279%   result is true we assume  all   workers  have been instructed to
  280%   stop or have stopped themselves.
  281
  282concur_cleanup(Result, Workers, Queues) :-
  283    !,
  284    (   Result == true
  285    ->  true
  286    ;   kill_workers(Workers)
  287    ),
  288    join_all(Workers),
  289    maplist(message_queue_destroy, Queues).
  290
  291kill_workers([]).
  292kill_workers([Id|T]) :-
  293    debug(concurrent, 'Signalling ~w', [Id]),
  294    catch(thread_signal(Id, abort), _, true),
  295    kill_workers(T).
  296
  297join_all([]).
  298join_all([Id|T]) :-
  299    thread_join(Id, _),
  300    join_all(T).
  301
  302
  303		 /*******************************
  304		 *             FORALL		*
  305		 *******************************/
  306
  307%!  concurrent_forall(:Generate, :Action) is semidet.
  308%!  concurrent_forall(:Generate, :Action, +Options) is semidet.
  309%
  310%   True when Action is true for all solutions of Generate. This has the
  311%   same semantics as forall/2, but  the   Action  goals are executed in
  312%   multiple threads. Notable a failing Action   or a Action throwing an
  313%   exception signals the calling  thread  which   in  turn  aborts  all
  314%   workers and fails or re-throws the generated error. Options:
  315%
  316%     - threads(+Count)
  317%       Number of threads to use.  The default is determined by the
  318%       Prolog flag `cpu_count`.
  319%
  320%   @tbd Ideally we would grow the   set of workers dynamically, similar
  321%   to dynamic scheduling of  HTTP  worker   threads.  This  would avoid
  322%   creating threads that are never used if Generate is too slow or does
  323%   not provide enough answers and  would   further  raise the number of
  324%   threads if Action is I/O bound rather than CPU bound.
  325
  326:- dynamic
  327    fa_aborted/1.  328
  329concurrent_forall(Generate, Test) :-
  330    concurrent_forall(Generate, Test, []).
  331
  332concurrent_forall(Generate, Test, Options) :-
  333    jobs(Jobs, Options),
  334    Jobs > 1,
  335    !,
  336    term_variables(Generate, GVars),
  337    term_variables(Test, TVars),
  338    sort(GVars, GVarsS),
  339    sort(TVars, TVarsS),
  340    ord_intersection(GVarsS, TVarsS, Shared),
  341    Templ =.. [v|Shared],
  342    MaxSize is Jobs*4,
  343    message_queue_create(Q, [max_size(MaxSize)]),
  344    length(Workers, Jobs),
  345    thread_self(Me),
  346    maplist(thread_create(fa_worker(Q, Me, Templ, Test)), Workers),
  347    catch(( forall(Generate,
  348                   thread_send_message(Q, job(Templ))),
  349            forall(between(1, Jobs, _),
  350                   thread_send_message(Q, done)),
  351            maplist(thread_join, Workers),
  352            message_queue_destroy(Q)
  353          ),
  354          Error,
  355          fa_cleanup(Error, Workers, Q)).
  356concurrent_forall(Generate, Test, _) :-
  357    forall(Generate, Test).
  358
  359fa_cleanup(Error, Workers, Q) :-
  360    maplist(safe_abort, Workers),
  361    debug(concurrent(fail), 'Joining workers', []),
  362    maplist(safe_join, Workers),
  363    debug(concurrent(fail), 'Destroying queue', []),
  364    retractall(fa_aborted(Q)),
  365    message_queue_destroy(Q),
  366    (   Error = fa_worker_failed(_0Test, Why)
  367    ->  debug(concurrent(fail), 'Test ~p failed: ~p', [_0Test, Why]),
  368        (   Why == false
  369        ->  fail
  370        ;   Why = error(E)
  371        ->  throw(E)
  372        ;   assertion(fail)
  373        )
  374    ;   throw(Error)
  375    ).
  376
  377fa_worker(Queue, Main, Templ, Test) :-
  378    repeat,
  379    thread_get_message(Queue, Msg),
  380    (   Msg == done
  381    ->  !
  382    ;   Msg = job(Templ),
  383        debug(concurrent, 'Running test ~p', [Test]),
  384        (   catch_with_backtrace(Test, E, true)
  385        ->  (   var(E)
  386            ->  fail
  387            ;   fa_stop(Queue, Main, fa_worker_failed(Test, error(E)))
  388            )
  389        ;   !,
  390            fa_stop(Queue, Main, fa_worker_failed(Test, false))
  391        )
  392    ).
  393
  394fa_stop(Queue, Main, Why) :-
  395    with_mutex('$concurrent_forall',
  396               fa_stop_sync(Queue, Main, Why)).
  397
  398fa_stop_sync(Queue, _Main, _Why) :-
  399    fa_aborted(Queue),
  400    !.
  401fa_stop_sync(Queue, Main, Why) :-
  402    asserta(fa_aborted(Queue)),
  403    debug(concurrent(fail), 'Stop due to ~p. Signalling ~q', [Why, Main]),
  404    thread_signal(Main, throw(Why)).
  405
  406jobs(Jobs, Options) :-
  407    (   option(threads(Jobs), Options)
  408    ->  true
  409    ;   current_prolog_flag(cpu_count, Jobs)
  410    ->  true
  411    ;   Jobs = 1
  412    ).
  413
  414safe_abort(Thread) :-
  415    catch(thread_signal(Thread, abort), error(_,_), true).
  416safe_join(Thread) :-
  417    E = error(_,_),
  418    catch(thread_join(Thread, _Status), E, true).
  419
  420
  421		 /*******************************
  422		 *              AND		*
  423		 *******************************/
  424
  425%!  concurrent_and(:Generator, :Test).
  426%!  concurrent_and(:Generator, :Test, +Options).
  427%
  428%   Concurrent version of `(Generator,Test)`. This   predicate creates a
  429%   thread providing solutions for Generator that   are handed to a pool
  430%   of threads that run Test for   the different instantiations provided
  431%   by Generator concurrently. The predicate  is logically equivalent to
  432%   a simple conjunction except for two  aspects: (1) terms are _copied_
  433%   from Generator to the test  Test   threads  while answers are copied
  434%   back to the calling thread and (2)   answers  may be produced out of
  435%   order.
  436%
  437%   If   the   evaluation   of   some    Test   raises   an   exception,
  438%   concurrent_and/2,3 is terminated with this  exception. If the caller
  439%   commits  after  a  given  answer  or    raises  an  exception  while
  440%   concurrent_and/2,3  is  active  with  pending   choice  points,  all
  441%   involved resources are reclaimed.
  442%
  443%   Options:
  444%
  445%     - threads(+Count)
  446%       Create a worker pool holding Count threads.  The default is
  447%       the Prolog flag `cpu_count`.
  448%
  449%   This    predicate    was    proposed     by      Jan     Burse    as
  450%   balance((Generator,Test)).
  451
  452concurrent_and(Gen, Test) :-
  453    concurrent_and(Gen, Test, []).
  454
  455concurrent_and(Gen, Test, Options) :-
  456    jobs(Jobs, Options),
  457    MaxSize is Jobs*4,
  458    message_queue_create(JobQueue, [max_size(MaxSize)]),
  459    message_queue_create(AnswerQueue, [max_size(MaxSize)]),
  460    ca_template(Gen, Test, Templ),
  461    term_variables(Gen+Test, AllVars),
  462    ReplyTempl =.. [v|AllVars],
  463    length(Workers, Jobs),
  464    Alive is 1<<Jobs-1,
  465    maplist(thread_create(ca_worker(JobQueue, AnswerQueue,
  466                                    Templ, Test, ReplyTempl)),
  467            Workers),
  468    thread_create(ca_generator(Gen, Templ, JobQueue, AnswerQueue),
  469                  GenThread),
  470    State = state(Alive),
  471    call_cleanup(
  472        ca_gather(State, AnswerQueue, ReplyTempl, Workers),
  473        ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue)).
  474
  475ca_gather(State, AnswerQueue, ReplyTempl, Workers) :-
  476    repeat,
  477       thread_get_message(AnswerQueue, Msg),
  478       (   Msg = true(ReplyTempl)
  479       ->  true
  480       ;   Msg = done(Worker)
  481       ->  nth0(Done, Workers, Worker),
  482           arg(1, State, Alive0),
  483           Alive1 is Alive0 /\ \(1<<Done),
  484           debug(concurrent(and), 'Alive = ~2r', [Alive1]),
  485           (   Alive1 =:= 0
  486           ->  !,
  487               fail
  488           ;   nb_setarg(1, State, Alive1),
  489               fail
  490           )
  491       ;   Msg = error(E)
  492       ->  throw(E)
  493       ).
  494
  495ca_template(Gen, Test, Templ) :-
  496    term_variables(Gen,  GVars),
  497    term_variables(Test, TVars),
  498    sort(GVars, GVarsS),
  499    sort(TVars, TVarsS),
  500    ord_intersection(GVarsS, TVarsS, Shared),
  501    ord_union(GVarsS, Shared, TemplVars),
  502    Templ =.. [v|TemplVars].
  503
  504ca_worker(JobQueue, AnswerQueue, Templ, Test, ReplyTempl) :-
  505    thread_self(Me),
  506    EG = error(existence_error(message_queue, _), _),
  507    repeat,
  508    catch(thread_get_message(JobQueue, Req), EG, Req=all_done),
  509    (   Req = job(Templ)
  510    ->  (   catch(Test, E, true),
  511            (   var(E)
  512            ->  thread_send_message(AnswerQueue, true(ReplyTempl))
  513            ;   thread_send_message(AnswerQueue, error(E))
  514            ),
  515            fail
  516        )
  517    ;   Req == done
  518    ->  !,
  519        message_queue_destroy(JobQueue),
  520        thread_send_message(AnswerQueue, done(Me))
  521    ;   assertion(Req == all_done)
  522    ->  !,
  523        thread_send_message(AnswerQueue, done(Me))
  524    ).
  525
  526ca_generator(Gen, Templ, JobQueue, AnswerQueue) :-
  527    (   catch(Gen, E, true),
  528        (   var(E)
  529        ->  thread_send_message(JobQueue, job(Templ))
  530        ;   thread_send_message(AnswerQueue, error(E))
  531        ),
  532        fail
  533    ;   thread_send_message(JobQueue, done)
  534    ).
  535
  536ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue) :-
  537    safe_abort(GenThread),
  538    safe_join(GenThread),
  539    maplist(safe_abort, Workers),
  540    maplist(safe_join, Workers),
  541    message_queue_destroy(AnswerQueue),
  542    catch(message_queue_destroy(JobQueue), error(_,_), true).
  543
  544
  545                 /*******************************
  546                 *             MAPLIST          *
  547                 *******************************/
  548
  549%!  concurrent_maplist(:Goal, +List) is semidet.
  550%!  concurrent_maplist(:Goal, +List1, +List2) is semidet.
  551%!  concurrent_maplist(:Goal, +List1, +List2, +List3) is semidet.
  552%
  553%   Concurrent version of maplist/2. This   predicate uses concurrent/3,
  554%   using multiple _worker_ threads.  The  number   of  threads  is  the
  555%   minimum of the list length and the   number  of cores available. The
  556%   number of cores is determined using  the prolog flag =cpu_count=. If
  557%   this flag is absent or 1 or List   has  less than two elements, this
  558%   predicate calls the corresponding maplist/N  version using a wrapper
  559%   based on once/1. Note that all goals   are executed as if wrapped in
  560%   once/1 and therefore these predicates are _semidet_.
  561%
  562%   Note that the the overhead  of   this  predicate is considerable and
  563%   therefore Goal must  be  fairly  expensive   before  one  reaches  a
  564%   speedup.
  565
  566concurrent_maplist(Goal, List) :-
  567    workers(List, WorkerCount),
  568    !,
  569    maplist(ml_goal(Goal), List, Goals),
  570    concurrent(WorkerCount, Goals, []).
  571concurrent_maplist(M:Goal, List) :-
  572    maplist(once_in_module(M, Goal), List).
  573
  574once_in_module(M, Goal, Arg) :-
  575    call(M:Goal, Arg), !.
  576
  577ml_goal(Goal, Elem, call(Goal, Elem)).
  578
  579concurrent_maplist(Goal, List1, List2) :-
  580    same_length(List1, List2),
  581    workers(List1, WorkerCount),
  582    !,
  583    maplist(ml_goal(Goal), List1, List2, Goals),
  584    concurrent(WorkerCount, Goals, []).
  585concurrent_maplist(M:Goal, List1, List2) :-
  586    maplist(once_in_module(M, Goal), List1, List2).
  587
  588once_in_module(M, Goal, Arg1, Arg2) :-
  589    call(M:Goal, Arg1, Arg2), !.
  590
  591ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)).
  592
  593concurrent_maplist(Goal, List1, List2, List3) :-
  594    same_length(List1, List2, List3),
  595    workers(List1, WorkerCount),
  596    !,
  597    maplist(ml_goal(Goal), List1, List2, List3, Goals),
  598    concurrent(WorkerCount, Goals, []).
  599concurrent_maplist(M:Goal, List1, List2, List3) :-
  600    maplist(once_in_module(M, Goal), List1, List2, List3).
  601
  602once_in_module(M, Goal, Arg1, Arg2, Arg3) :-
  603    call(M:Goal, Arg1, Arg2, Arg3), !.
  604
  605ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)).
  606
  607workers(List, Count) :-
  608    current_prolog_flag(cpu_count, Cores),
  609    Cores > 1,
  610    length(List, Len),
  611    Count is min(Cores,Len),
  612    Count > 1,
  613    !.
  614
  615same_length([], [], []).
  616same_length([_|T1], [_|T2], [_|T3]) :-
  617    same_length(T1, T2, T3).
  618
  619
  620                 /*******************************
  621                 *             FIRST            *
  622                 *******************************/
  623
  624%!  first_solution(-X, :Goals, +Options) is semidet.
  625%
  626%   Try  alternative  solvers  concurrently,   returning  the  first
  627%   answer. In a typical scenario, solving any of the goals in Goals
  628%   is satisfactory for the application to  continue. As soon as one
  629%   of the tried alternatives is  successful,   all  the others are
  630%   killed and first_solution/3 succeeds.
  631%
  632%   For example, if it is unclear whether   it is better to search a
  633%   graph breadth-first or depth-first we can use:
  634%
  635%   ==
  636%   search_graph(Grap, Path) :-
  637%            first_solution(Path, [ breadth_first(Graph, Path),
  638%                                   depth_first(Graph, Path)
  639%                                 ],
  640%                           []).
  641%   ==
  642%
  643%   Options include thread stack-sizes passed   to thread_create, as
  644%   well as the options =on_fail= and   =on_error= that specify what
  645%   to do if a  solver  fails  or   triggers  an  error.  By default
  646%   execution of all  solvers  is  terminated   and  the  result  is
  647%   returned. Sometimes one may wish to  continue. One such scenario
  648%   is if one of the solvers may run  out of resources or one of the
  649%   solvers is known to be incomplete.
  650%
  651%           * on_fail(Action)
  652%           If =stop= (default), terminate all threads and stop with
  653%           the failure.  If =continue=, keep waiting.
  654%           * on_error(Action)
  655%           As above, re-throwing the error if an error appears.
  656%
  657%   @bug    first_solution/3 cannot deal with non-determinism.  There
  658%           is no obvious way to fit non-determinism into it.  If multiple
  659%           solutions are needed wrap the solvers in findall/3.
  660
  661
  662first_solution(X, M:List, Options) :-
  663    message_queue_create(Done),
  664    thread_options(Options, ThreadOptions, RestOptions),
  665    length(List, JobCount),
  666    create_solvers(List, M, X, Done, Solvers, ThreadOptions),
  667    wait_for_one(JobCount, Done, Result, RestOptions),
  668    concur_cleanup(kill, Solvers, [Done]),
  669    (   Result = done(_, Var)
  670    ->  X = Var
  671    ;   Result = error(_, Error)
  672    ->  throw(Error)
  673    ).
  674
  675create_solvers([], _, _, _, [], _).
  676create_solvers([H|T], M, X, Done, [Id|IDs], Options) :-
  677    thread_create(solve(M:H, X, Done), Id, Options),
  678    create_solvers(T, M, X, Done, IDs, Options).
  679
  680solve(Goal, Var, Queue) :-
  681    thread_self(Me),
  682    (   catch(Goal, E, true)
  683    ->  (   var(E)
  684        ->  thread_send_message(Queue, done(Me, Var))
  685        ;   thread_send_message(Queue, error(Me, E))
  686        )
  687    ;   thread_send_message(Queue, failed(Me))
  688    ).
  689
  690wait_for_one(0, _, failed, _) :- !.
  691wait_for_one(JobCount, Queue, Result, Options) :-
  692    thread_get_message(Queue, Msg),
  693    LeftCount is JobCount - 1,
  694    (   Msg = done(_, _)
  695    ->  Result = Msg
  696    ;   Msg = failed(_)
  697    ->  (   option(on_fail(stop), Options, stop)
  698        ->  Result = Msg
  699        ;   wait_for_one(LeftCount, Queue, Result, Options)
  700        )
  701    ;   Msg = error(_, _)
  702    ->  (   option(on_error(stop), Options, stop)
  703        ->  Result = Msg
  704        ;   wait_for_one(LeftCount, Queue, Result, Options)
  705        )
  706    ).
  707
  708
  709%!  thread_options(+Options, -ThreadOptions, -RestOptions) is det.
  710%
  711%   Split the option  list  over   thread(-size)  options  and other
  712%   options.
  713
  714thread_options([], [], []).
  715thread_options([H|T], [H|Th], O) :-
  716    thread_option(H),
  717    !,
  718    thread_options(T, Th, O).
  719thread_options([H|T], Th, [H|O]) :-
  720    thread_options(T, Th, O).
  721
  722thread_option(local(_)).
  723thread_option(global(_)).
  724thread_option(trail(_)).
  725thread_option(argument(_)).
  726thread_option(stack(_)).
  727
  728
  729%!  call_in_thread(+Thread, :Goal) is semidet.
  730%!  call_in_thread(+Thread, :Goal, +Options) is semidet.
  731%
  732%   Run Goal as an interrupt in the context  of Thread. This is based on
  733%   thread_signal/2. If waiting times  out,   we  inject  a stop(Reason)
  734%   exception into Goal. Interrupts can be   nested, i.e., it is allowed
  735%   to run a call_in_thread/2 while the target thread is processing such
  736%   an interrupt.
  737%
  738%   Options are passed to  thread_get_message/3   and  notably allow for
  739%   specifying a timeout. If a timeout   is reached, this predicate will
  740%   attempt to kill Goal in  Thread  and   act  according  to the option
  741%   `on_timeout`.
  742%
  743%     - on_timeout(:Goal)
  744%       If waiting terminates due to a timeout(Time), or deadline(Stamp)
  745%       option, call Goal.  The default is throw(time_limit_exceeded).
  746%
  747%   This predicate is primarily intended   for  debugging and inspection
  748%   tasks.
  749
  750call_in_thread(Thread, Goal) :-
  751    call_in_thread(Thread, Goal, []).
  752
  753call_in_thread(Thread, Goal, _) :-
  754    must_be(callable, Goal),
  755    var(Thread),
  756    !,
  757    instantiation_error(Thread).
  758call_in_thread(Thread, Goal, _) :-
  759    thread_self(Thread),
  760    !,
  761    once(Goal).
  762call_in_thread(Thread, Goal, Options) :-
  763    meta_options(is_meta, Options, Options1),
  764    term_variables(Goal, Vars),
  765    thread_self(Me),
  766    A is random(1 000 000 000),
  767    thread_signal(Thread, run_in_thread(Goal,Vars,A,Me)),
  768    (   catch(thread_get_message(Me, in_thread(A,Result), Options1),
  769              Error,
  770              forward_exception(Thread, A, Error))
  771    ->  (   Result = true(Vars)
  772        ->  true
  773        ;   Result = error(Error)
  774        ->  throw(Error)
  775        ;   fail
  776        )
  777    ;   thread_signal(Thread, kill_task(A, stop(time_limit_exceeded))),
  778        option(on_timeout(Action), Options1, throw(time_limit_exceeded)),
  779        call(Action)
  780    ).
  781
  782is_meta(on_timeout).
  783
  784run_in_thread(Goal, Vars, Id, Sender) :-
  785    (   catch_with_backtrace(call(Goal), Error, true)
  786    ->  (   var(Error)
  787        ->  thread_send_message(Sender, in_thread(Id, true(Vars)))
  788        ;   Error = stop(_)
  789        ->  true
  790        ;   thread_send_message(Sender, in_thread(Id, error(Error)))
  791        )
  792    ;   thread_send_message(Sender, in_thread(Id, false))
  793    ).
  794
  795forward_exception(Thread, Id, Error) :-
  796    kill_with(Error, Kill),
  797    thread_signal(Thread, kill_task(Id, Kill)),
  798    throw(Error).
  799
  800kill_with(time_limit_exceeded, stop(time_limit_exceeded)) :-
  801    !.
  802kill_with(_, stop(interrupt)).
  803
  804kill_task(Id, Exception) :-
  805    prolog_current_frame(Frame),
  806    prolog_frame_attribute(Frame, parent_goal,
  807                           run_in_thread(_Goal, _Vars, Id, _Sender)),
  808    !,
  809    throw(Exception).
  810kill_task(_, _)