1/* Part of sparkle
    2	Copyright 2014-2015 Samer Abdallah (UCL)
    3	 
    4	This program is free software; you can redistribute it and/or
    5	modify it under the terms of the GNU Lesser General Public License
    6	as published by the Free Software Foundation; either version 2
    7	of the License, or (at your option) any later version.
    8
    9	This program is distributed in the hope that it will be useful,
   10	but WITHOUT ANY WARRANTY; without even the implied warranty of
   11	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   12	GNU Lesser General Public License for more details.
   13
   14	You should have received a copy of the GNU Lesser General Public
   15	License along with this library; if not, write to the Free Software
   16	Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
   17*/
   18
   19:- module(concurrency, [concurrent_or/1, concurrent_or/3]).   20
   21:- meta_predicate concurrent_or(-,:,+).   22:- meta_predicate concurrent_or(:).
 concurrent_or(+Goals:list(callable)) is nondet
Simple interface to concurrent_or/3. Equivalent to concurrent_or(Vars,Goals,[]) where Vars is a list of all the variables in Goals.
   29concurrent_or(Goals) :-
   30   term_variables(Goals,Vars),
   31   concurrent_or(Vars,Goals,[]).
 concurrent_or(-Vars, +Goals:list(callable), +Options:list(option)) is nondet
Succeeds once for each solution of each goal in Goals, with Vars bound to sharing variables in Goals. Goals are executed in parallel. Valid options are
on_error(OnError:oneof([stop,continue]))
If OnError=stop, then an exception occuring in any goal stops all goals and is propagated back to and then thrown from the main thread. If OnError=continue, then an exception in a goal terminates only that thread, with a error message printed. The default is stop.
queue_factor(K:natural)
Solutions are communicated via a message queue of size K*length(Goals). This limits the extent to which threads compute solutions that have not yet been requested. The default is 1. Any remaining options are passed to thread_create/3.
   49concurrent_or(Vars, M:List, Options) :-
   50   select_option(on_error(OnError),Options,Opts1,stop),
   51   select_option(queue_factor(K),Opts1,Opts2,1),
   52   length(List, JobCount),
   53   QueueSize is K*JobCount,
   54   message_queue_create(Done,[max_size(QueueSize)]),
   55   setup_call_cleanup(
   56      maplist(create_worker(M,Vars,Done,Opts2),List,Solvers),
   57      wait_for_one(JobCount, Done, Vars, OnError),
   58      (  debug(concurrency,'Sending kill signal to workers',[]),
   59         maplist(kill_thread,Solvers), drain(Done),
   60         debug(concurrency,'Waiting for workers to die.',[]),
   61         maplist(thread_join,Solvers,_),
   62         message_queue_destroy(Done)
   63      )
   64   ).
   65
   66drain(Q) :- thread_get_message(Q,_,[timeout(0)]) -> drain(Q); true.
   67kill_thread(Id) :- catch(thread_signal(Id,throw(abort)),_,true).
   68create_worker(M,V,Q,O,H,Id) :- thread_create(worker(M:H,V,Q),Id,O).
   69
   70wait_for_one(N, Q, X, OnError) :-
   71   succ(N1,N),
   72   thread_get_message(Q, Msg),
   73   (  Msg=success(_,Var) -> (X=Var; wait_for_one(N,Q,X,OnError))
   74   ;  Msg=failed(_)      -> wait_for_one(N1,Q,X,OnError)
   75   ;  Msg=error(_,E)     -> ( OnError=stop -> throw(error(E))
   76                            ; print_message(error,E),
   77                              wait_for_one(N1,Q,X,OnError)
   78                            )
   79   ).
   80
   81worker(Goal,Var,Q) :-
   82   thread_self(Me),
   83   debug(concurrency,'Worker started on ~q.',[Goal]),
   84   (  catch( Goal,E, (thread_send_message(Q,error(Me,E)), throw(error))),
   85      thread_send_message(Q,success(Me,Var)), fail
   86   ;  thread_send_message(Q,failed(Me)),
   87      debug(concurrency,'Worker finished normally.',[])
   88   )